vidformer

  1"""
  2vidformer-py is a Python 🐍 interface for [vidformer](https://github.com/ixlab/vidformer).
  3
  4**Quick links:**
  5* [📦 PyPI](https://pypi.org/project/vidformer/)
  6* [📘 Documentation - vidformer-py](https://ixlab.github.io/vidformer/vidformer-py/pdoc/)
  7* [📘 Documentation - vidformer.cv2](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/cv2.html)
  8* [📘 Documentation - vidformer.supervision](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/supervision.html)
  9* [🧑‍💻 Source Code](https://github.com/ixlab/vidformer/tree/main/vidformer-py/)
 10"""
 11
 12__version__ = "1.2.2"
 13
 14
 15import base64
 16import gzip
 17import json
 18import struct
 19import time
 20from fractions import Fraction
 21from urllib.parse import urlparse
 22
 23import requests
 24
 25_in_notebook = False
 26try:
 27    from IPython import get_ipython
 28
 29    if "IPKernelApp" in get_ipython().config:
 30        _in_notebook = True
 31except Exception:
 32    pass
 33
 34
 35def _wait_for_url(url, max_attempts=150, delay=0.1):
 36    for attempt in range(max_attempts):
 37        try:
 38            response = requests.get(url)
 39            if response.status_code == 200:
 40                return response.text.strip()
 41            else:
 42                time.sleep(delay)
 43        except requests.exceptions.RequestException:
 44            time.sleep(delay)
 45    return None
 46
 47
 48def _play(namespace, hls_video_url, hls_js_url, method="display", status_url=None):
 49    # The namespace is so multiple videos in one tab don't conflict
 50
 51    if method == "display":
 52        from IPython.display import display
 53
 54        display(
 55            _play(
 56                namespace,
 57                hls_video_url,
 58                hls_js_url,
 59                method="html",
 60                status_url=status_url,
 61            )
 62        )
 63        return
 64    if method == "html":
 65        from IPython.display import HTML
 66
 67        if not status_url:
 68            html_code = f"""
 69<!DOCTYPE html>
 70<html>
 71<head>
 72    <title>HLS Video Player</title>
 73    <!-- Include hls.js library -->
 74    <script src="{hls_js_url}"></script>
 75</head>
 76<body>
 77    <video id="video-{namespace}" controls width="640" height="360" autoplay></video>
 78    <script>
 79        var video = document.getElementById('video-{namespace}');
 80        var videoSrc = '{hls_video_url}';
 81
 82        if (Hls.isSupported()) {{
 83            var hls = new Hls();
 84            hls.loadSource(videoSrc);
 85            hls.attachMedia(video);
 86            hls.on(Hls.Events.MANIFEST_PARSED, function() {{
 87                video.play();
 88            }});
 89        }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{
 90            video.src = videoSrc;
 91            video.addEventListener('loadedmetadata', function() {{
 92                video.play();
 93            }});
 94        }} else {{
 95            console.error('This browser does not appear to support HLS.');
 96        }}
 97    </script>
 98</body>
 99</html>
100"""
101            return HTML(data=html_code)
102        else:
103            html_code = f"""
104<!DOCTYPE html>
105<html>
106<head>
107    <title>HLS Video Player</title>
108    <script src="{hls_js_url}"></script>
109</head>
110<body>
111    <div id="container-{namespace}"></div>
112    <script>
113        var statusUrl = '{status_url}';
114        var videoSrc = '{hls_video_url}';
115        var videoNamespace = '{namespace}';
116
117        function showWaiting() {{
118            document.getElementById('container-{namespace}').textContent = 'Waiting...';
119            pollStatus();
120        }}
121
122        function pollStatus() {{
123            setTimeout(function() {{
124                fetch(statusUrl)
125                    .then(r => r.json())
126                    .then(res => {{
127                        if (res.ready) {{
128                            document.getElementById('container-{namespace}').textContent = '';
129                            attachHls();
130                        }} else {{
131                            pollStatus();
132                        }}
133                    }})
134                    .catch(e => {{
135                        console.error(e);
136                        pollStatus();
137                    }});
138            }}, 250);
139        }}
140
141        function attachHls() {{
142            var container = document.getElementById('container-{namespace}');
143            container.textContent = '';
144            var video = document.createElement('video');
145            video.id = 'video-' + videoNamespace;
146            video.controls = true;
147            video.width = 640;
148            video.height = 360;
149            container.appendChild(video);
150            if (Hls.isSupported()) {{
151                var hls = new Hls();
152                hls.loadSource(videoSrc);
153                hls.attachMedia(video);
154                hls.on(Hls.Events.MANIFEST_PARSED, function() {{
155                    video.play();
156                }});
157            }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{
158                video.src = videoSrc;
159                video.addEventListener('loadedmetadata', function() {{
160                    video.play();
161                }});
162            }}
163        }}
164
165        fetch(statusUrl)
166            .then(r => r.json())
167            .then(res => {{
168                if (res.ready) {{
169                    attachHls();
170                }} else {{
171                    showWaiting();
172                }}
173            }})
174            .catch(e => {{
175                console.error(e);
176                showWaiting();
177            }});
178    </script>
179</body>
180</html>
181"""
182        return HTML(data=html_code)
183    elif method == "link":
184        return hls_video_url
185    else:
186        raise ValueError("Invalid method")
187
188
189def _feb_expr_coded_as_scalar(expr) -> bool:
190    if type(expr) is tuple:
191        expr = list(expr)
192    if type(expr) is FilterExpr:
193        return False
194    if type(expr) is list:
195        if len(expr) > 3:
196            return False
197        else:
198            return all([type(x) is int and x >= -(2**15) and x < 2**15 for x in expr])
199    else:
200        assert type(expr) in [int, float, str, bytes, SourceExpr, bool, list]
201        return True
202
203
204class _FrameExpressionBlock:
205    def __init__(self):
206        self._functions = []
207        self._literals = []
208        self._sources = []
209        self._kwarg_keys = []
210        self._source_fracs = []
211        self._exprs = []
212        self._frame_exprs = []
213
214    def __len__(self):
215        return len(self._frame_exprs)
216
217    def insert_expr(self, expr):
218        if type(expr) is SourceExpr or type(expr) is FilterExpr:
219            return self.insert_frame_expr(expr)
220        else:
221            return self.insert_data_expr(expr)
222
223    def insert_data_expr(self, data):
224        if type(data) is tuple:
225            data = list(data)
226
227        if type(data) is bool:
228            self._exprs.append(0x01000000_00000000 | int(data))
229            return len(self._exprs) - 1
230        elif type(data) is int:
231            if data >= -(2**31) and data < 2**31:
232                self._exprs.append(data & 0xFFFFFFFF)
233            else:
234                self._literals.append(_json_arg(data, skip_data_anot=True))
235                self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
236            return len(self._exprs) - 1
237        elif type(data) is float:
238            self._exprs.append(
239                0x02000000_00000000 | int.from_bytes(struct.pack("f", data)[::-1])
240            )
241        elif type(data) is str:
242            self._literals.append(_json_arg(data, skip_data_anot=True))
243            self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
244        elif type(data) is bytes:
245            self._literals.append(_json_arg(data, skip_data_anot=True))
246            self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
247        elif type(data) is list:
248            if len(data) == 0:
249                self._exprs.append(0x03000000_00000000)
250                return len(self._exprs) - 1
251            if (
252                len(data) == 1
253                and type(data[0]) is int
254                and data[0] >= -(2**15)
255                and data[0] < 2**15
256            ):
257                self._exprs.append(0x04000000_00000000 | (data[0] & 0xFFFF))
258                return len(self._exprs) - 1
259            if (
260                len(data) == 2
261                and type(data[0]) is int
262                and data[0] >= -(2**15)
263                and data[0] < 2**15
264                and type(data[1]) is int
265                and data[1] >= -(2**15)
266                and data[1] < 2**15
267            ):
268                self._exprs.append(
269                    0x05000000_00000000
270                    | ((data[0] & 0xFFFF) << 16)
271                    | (data[1] & 0xFFFF)
272                )
273                return len(self._exprs) - 1
274            if (
275                len(data) == 3
276                and type(data[0]) is int
277                and data[0] >= -(2**15)
278                and data[0] < 2**15
279                and type(data[1]) is int
280                and data[1] >= -(2**15)
281                and data[1] < 2**15
282                and type(data[2]) is int
283                and data[2] >= -(2**15)
284                and data[2] < 2**15
285            ):
286                self._exprs.append(
287                    0x06000000_00000000
288                    | ((data[0] & 0xFFFF) << 32)
289                    | ((data[1] & 0xFFFF) << 16)
290                    | (data[2] & 0xFFFF)
291                )
292                return len(self._exprs) - 1
293            member_idxs = []
294            for member in data:
295                if _feb_expr_coded_as_scalar(member):
296                    member_idxs.append(None)
297                else:
298                    member_idxs.append(self.insert_data_expr(member))
299
300            out = len(self._exprs)
301            self._exprs.append(0x42000000_00000000 | len(data))
302
303            for i in range(len(data)):
304                if member_idxs[i] is None:
305                    self.insert_data_expr(data[i])
306                else:
307                    self._exprs.append(0x45000000_00000000 | member_idxs[i])
308
309            return out
310        else:
311            raise Exception("Invalid data type")
312
313    def insert_frame_expr(self, frame):
314        if type(frame) is SourceExpr:
315            source = frame._source._name
316            if source in self._sources:
317                source_idx = self._sources.index(source)
318            else:
319                source_idx = len(self._sources)
320                self._sources.append(source)
321            if frame._is_iloc:
322                self._exprs.append(
323                    0x43000000_00000000 | (source_idx << 32) | frame._idx
324                )
325            else:
326                idx = len(self._source_fracs) // 2
327                self._source_fracs.append(frame._idx.numerator)
328                self._source_fracs.append(frame._idx.denominator)
329                self._exprs.append(0x44000000_00000000 | (source_idx << 32) | idx)
330            return len(self._exprs) - 1
331        elif type(frame) is FilterExpr:
332            func = frame._filter._func
333            if func in self._functions:
334                func_idx = self._functions.index(func)
335            else:
336                func_idx = len(self._functions)
337                self._functions.append(func)
338            len_args = len(frame._args)
339            len_kwargs = len(frame._kwargs)
340
341            arg_idxs = []
342            for arg in frame._args:
343                if _feb_expr_coded_as_scalar(arg):
344                    arg_idxs.append(None)
345                else:
346                    arg_idxs.append(self.insert_expr(arg))
347            kwarg_idxs = {}
348            for k, v in frame._kwargs.items():
349                if _feb_expr_coded_as_scalar(v):
350                    kwarg_idxs[k] = None
351                else:
352                    kwarg_idxs[k] = self.insert_expr(v)
353
354            out_idx = len(self._exprs)
355            self._exprs.append(
356                0x41000000_00000000 | (len_args << 24) | (len_kwargs << 16) | func_idx
357            )
358            for i in range(len_args):
359                if arg_idxs[i] is None:
360                    # It's a scalar
361                    self.insert_expr(frame._args[i])
362                else:
363                    # It's an expression pointer
364                    self._exprs.append(0x45000000_00000000 | arg_idxs[i])
365            for k, v in frame._kwargs.items():
366                if k in self._kwarg_keys:
367                    k_idx = self._kwarg_keys.index(k)
368                else:
369                    k_idx = len(self._kwarg_keys)
370                    self._kwarg_keys.append(k)
371                self._exprs.append(0x46000000_00000000 | k_idx)
372                if kwarg_idxs[k] is None:
373                    # It's a scalar
374                    self.insert_expr(v)
375                else:
376                    # It's an expression pointer
377                    self._exprs.append(0x45000000_00000000 | kwarg_idxs[k])
378            return out_idx
379        else:
380            raise Exception("Invalid frame type")
381
382    def insert_frame(self, frame):
383        idx = self.insert_frame_expr(frame)
384        self._frame_exprs.append(idx)
385
386    def as_dict(self):
387        return {
388            "functions": self._functions,
389            "literals": self._literals,
390            "sources": self._sources,
391            "kwarg_keys": self._kwarg_keys,
392            "source_fracs": self._source_fracs,
393            "exprs": self._exprs,
394            "frame_exprs": self._frame_exprs,
395        }
396
397
398class Source:
399    def __init__(self, id: str, src):
400        self._name = id
401        self._fmt = {
402            "width": src["width"],
403            "height": src["height"],
404            "pix_fmt": src["pix_fmt"],
405        }
406        self._ts = [Fraction(x[0], x[1]) for x in src["ts"]]
407        self.iloc = _SourceILoc(self)
408
409    def id(self) -> str:
410        return self._name
411
412    def fmt(self):
413        return {**self._fmt}
414
415    def ts(self) -> list[Fraction]:
416        return self._ts.copy()
417
418    def __len__(self):
419        return len(self._ts)
420
421    def __getitem__(self, idx):
422        if type(idx) is not Fraction:
423            raise Exception("Source index must be a Fraction")
424        return SourceExpr(self, idx, False)
425
426    def __repr__(self):
427        return f"Source({self._name})"
428
429
430class Spec:
431    def __init__(self, id: str, src):
432        self._id = id
433        self._fmt = {
434            "width": src["width"],
435            "height": src["height"],
436            "pix_fmt": src["pix_fmt"],
437        }
438        self._vod_endpoint = src["vod_endpoint"]
439        parsed_url = urlparse(self._vod_endpoint)
440        self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
441
442    def id(self) -> str:
443        return self._id
444
445    def play(self, method):
446        url = f"{self._vod_endpoint}playlist.m3u8"
447        status_url = f"{self._vod_endpoint}status"
448        hls_js_url = self._hls_js_url
449        return _play(self._id, url, hls_js_url, method=method, status_url=status_url)
450
451
452class Server:
453    def __init__(
454        self, endpoint: str, api_key: str, vod_only=False, cv2_writer_init_callback=None
455    ):
456        if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
457            raise Exception("Endpoint must start with http:// or https://")
458        if endpoint.endswith("/"):
459            raise Exception("Endpoint must not end with /")
460        self._endpoint = endpoint
461
462        self._api_key = api_key
463        self._session = requests.Session()
464        self._session.headers.update({"Authorization": f"Bearer {self._api_key}"})
465        response = self._session.get(
466            f"{self._endpoint}/v2/auth",
467            headers={"Authorization": f"Bearer {self._api_key}"},
468        )
469        if not response.ok:
470            raise Exception(response.text)
471        response = response.json()
472        assert response["status"] == "ok"
473        self._vod_only = vod_only
474        self._cv2_writer_init_callback = cv2_writer_init_callback
475
476    def is_vod_only(self) -> bool:
477        return self._vod_only
478
479    def cv2_writer_init_callback(self):
480        return self._cv2_writer_init_callback
481
482    def get_source(self, id: str) -> Source:
483        assert type(id) is str
484        response = self._session.get(
485            f"{self._endpoint}/v2/source/{id}",
486            headers={"Authorization": f"Bearer {self._api_key}"},
487        )
488        if not response.ok:
489            raise Exception(response.text)
490        response = response.json()
491        return Source(response["id"], response)
492
493    def list_sources(self) -> list[str]:
494        response = self._session.get(
495            f"{self._endpoint}/v2/source",
496            headers={"Authorization": f"Bearer {self._api_key}"},
497        )
498        if not response.ok:
499            raise Exception(response.text)
500        response = response.json()
501        return response
502
503    def delete_source(self, id: str):
504        assert type(id) is str
505        response = self._session.delete(
506            f"{self._endpoint}/v2/source/{id}",
507            headers={"Authorization": f"Bearer {self._api_key}"},
508        )
509        if not response.ok:
510            raise Exception(response.text)
511        response = response.json()
512        assert response["status"] == "ok"
513
514    def search_source(
515        self, name, stream_idx, storage_service, storage_config
516    ) -> list[str]:
517        assert type(name) is str
518        assert type(stream_idx) is int
519        assert type(storage_service) is str
520        assert type(storage_config) is dict
521        for k, v in storage_config.items():
522            assert type(k) is str
523            assert type(v) is str
524        req = {
525            "name": name,
526            "stream_idx": stream_idx,
527            "storage_service": storage_service,
528            "storage_config": storage_config,
529        }
530        response = self._session.post(
531            f"{self._endpoint}/v2/source/search",
532            json=req,
533            headers={"Authorization": f"Bearer {self._api_key}"},
534        )
535        if not response.ok:
536            raise Exception(response.text)
537        response = response.json()
538        return response
539
540    def create_source(
541        self, name, stream_idx, storage_service, storage_config
542    ) -> Source:
543        assert type(name) is str
544        assert type(stream_idx) is int
545        assert type(storage_service) is str
546        assert type(storage_config) is dict
547        for k, v in storage_config.items():
548            assert type(k) is str
549            assert type(v) is str
550        req = {
551            "name": name,
552            "stream_idx": stream_idx,
553            "storage_service": storage_service,
554            "storage_config": storage_config,
555        }
556        response = self._session.post(
557            f"{self._endpoint}/v2/source",
558            json=req,
559            headers={"Authorization": f"Bearer {self._api_key}"},
560        )
561        if not response.ok:
562            raise Exception(response.text)
563        response = response.json()
564        assert response["status"] == "ok"
565        id = response["id"]
566        return self.get_source(id)
567
568    def source(self, name, stream_idx, storage_service, storage_config) -> Source:
569        """Convenience function for accessing sources.
570
571        Tries to find a source with the given name, stream_idx, storage_service, and storage_config.
572        If no source is found, creates a new source with the given parameters.
573        """
574
575        sources = self.search_source(name, stream_idx, storage_service, storage_config)
576        if len(sources) == 0:
577            return self.create_source(name, stream_idx, storage_service, storage_config)
578        return self.get_source(sources[0])
579
580    def get_spec(self, id: str) -> Spec:
581        assert type(id) is str
582        response = self._session.get(
583            f"{self._endpoint}/v2/spec/{id}",
584            headers={"Authorization": f"Bearer {self._api_key}"},
585        )
586        if not response.ok:
587            raise Exception(response.text)
588        response = response.json()
589        return Spec(response["id"], response)
590
591    def list_specs(self) -> list[str]:
592        response = self._session.get(
593            f"{self._endpoint}/v2/spec",
594            headers={"Authorization": f"Bearer {self._api_key}"},
595        )
596        if not response.ok:
597            raise Exception(response.text)
598        response = response.json()
599        return response
600
601    def create_spec(
602        self,
603        width,
604        height,
605        pix_fmt,
606        vod_segment_length,
607        frame_rate,
608        ready_hook=None,
609        steer_hook=None,
610        ttl=None,
611    ) -> Spec:
612        assert type(width) is int
613        assert type(height) is int
614        assert type(pix_fmt) is str
615        assert type(vod_segment_length) is Fraction
616        assert type(frame_rate) is Fraction
617        assert type(ready_hook) is str or ready_hook is None
618        assert type(steer_hook) is str or steer_hook is None
619        assert ttl is None or type(ttl) is int
620
621        req = {
622            "width": width,
623            "height": height,
624            "pix_fmt": pix_fmt,
625            "vod_segment_length": [
626                vod_segment_length.numerator,
627                vod_segment_length.denominator,
628            ],
629            "frame_rate": [frame_rate.numerator, frame_rate.denominator],
630            "ready_hook": ready_hook,
631            "steer_hook": steer_hook,
632            "ttl": ttl,
633        }
634        response = self._session.post(
635            f"{self._endpoint}/v2/spec",
636            json=req,
637            headers={"Authorization": f"Bearer {self._api_key}"},
638        )
639        if not response.ok:
640            raise Exception(response.text)
641        response = response.json()
642        assert response["status"] == "ok"
643        return self.get_spec(response["id"])
644
645    def delete_spec(self, id: str):
646        assert type(id) is str
647        response = self._session.delete(
648            f"{self._endpoint}/v2/spec/{id}",
649            headers={"Authorization": f"Bearer {self._api_key}"},
650        )
651        if not response.ok:
652            raise Exception(response.text)
653        response = response.json()
654        assert response["status"] == "ok"
655
656    def export_spec(
657        self, id: str, path: str, encoder=None, encoder_opts=None, format=None
658    ):
659        assert type(id) is str
660        assert type(path) is str
661        req = {
662            "path": path,
663            "encoder": encoder,
664            "encoder_opts": encoder_opts,
665            "format": format,
666        }
667        response = self._session.post(
668            f"{self._endpoint}/v2/spec/{id}/export",
669            json=req,
670            headers={"Authorization": f"Bearer {self._api_key}"},
671        )
672        if not response.ok:
673            raise Exception(response.text)
674        response = response.json()
675        assert response["status"] == "ok"
676
677    def push_spec_part(self, spec_id, pos, frames, terminal):
678        if type(spec_id) is Spec:
679            spec_id = spec_id._id
680        assert type(spec_id) is str
681        assert type(pos) is int
682        assert type(frames) is list
683        assert type(terminal) is bool
684
685        req_frames = []
686        for frame in frames:
687            assert type(frame) is tuple
688            assert len(frame) == 2
689            t = frame[0]
690            f = frame[1]
691            assert type(t) is Fraction
692            assert f is None or type(f) is SourceExpr or type(f) is FilterExpr
693            req_frames.append(
694                [
695                    [t.numerator, t.denominator],
696                    f._to_json_spec() if f is not None else None,
697                ]
698            )
699
700        req = {
701            "pos": pos,
702            "frames": req_frames,
703            "terminal": terminal,
704        }
705        response = self._session.post(
706            f"{self._endpoint}/v2/spec/{spec_id}/part",
707            json=req,
708            headers={"Authorization": f"Bearer {self._api_key}"},
709        )
710        if not response.ok:
711            raise Exception(response.text)
712        response = response.json()
713        assert response["status"] == "ok"
714
715    def push_spec_part_block(
716        self, spec_id: str, pos, blocks, terminal, compression="gzip"
717    ):
718        if type(spec_id) is Spec:
719            spec_id = spec_id._id
720        assert type(spec_id) is str
721        assert type(pos) is int
722        assert type(blocks) is list
723        assert type(terminal) is bool
724        assert compression is None or compression == "gzip"
725
726        req_blocks = []
727        for block in blocks:
728            assert type(block) is _FrameExpressionBlock
729            block_body = block.as_dict()
730            block_frames = len(block_body["frame_exprs"])
731            block_body = json.dumps(block_body).encode("utf-8")
732            if compression == "gzip":
733                block_body = gzip.compress(block_body, 1)
734            block_body = base64.b64encode(block_body).decode("utf-8")
735            req_blocks.append(
736                {
737                    "frames": block_frames,
738                    "compression": compression,
739                    "body": block_body,
740                }
741            )
742
743        req = {
744            "pos": pos,
745            "terminal": terminal,
746            "blocks": req_blocks,
747        }
748        response = self._session.post(
749            f"{self._endpoint}/v2/spec/{spec_id}/part_block",
750            json=req,
751            headers={"Authorization": f"Bearer {self._api_key}"},
752        )
753        if not response.ok:
754            raise Exception(response.text)
755        response = response.json()
756        assert response["status"] == "ok"
757
758    def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"):
759        assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr
760        assert compression is None or compression in ["gzip"]
761        feb = _FrameExpressionBlock()
762        feb.insert_frame(frame_expr)
763        feb_body = feb.as_dict()
764
765        feb_body = json.dumps(feb_body).encode("utf-8")
766        if compression == "gzip":
767            feb_body = gzip.compress(feb_body, 1)
768        feb_body = base64.b64encode(feb_body).decode("utf-8")
769        req = {
770            "width": width,
771            "height": height,
772            "pix_fmt": pix_fmt,
773            "compression": compression,
774            "block": {
775                "frames": 1,
776                "compression": compression,
777                "body": feb_body,
778            },
779        }
780        response = self._session.post(
781            f"{self._endpoint}/v2/frame",
782            json=req,
783            headers={"Authorization": f"Bearer {self._api_key}"},
784        )
785        if not response.ok:
786            raise Exception(response.text)
787        response_body = response.content
788        assert type(response_body) is bytes
789        if compression == "gzip":
790            response_body = gzip.decompress(response_body)
791        return response_body
792
793
794class SourceExpr:
795    def __init__(self, source, idx, is_iloc):
796        self._source = source
797        self._idx = idx
798        self._is_iloc = is_iloc
799
800    def __repr__(self):
801        if self._is_iloc:
802            return f"{self._source._name}.iloc[{self._idx}]"
803        else:
804            return f"{self._source._name}[{self._idx}]"
805
806    def _to_json_spec(self):
807        if self._is_iloc:
808            return {
809                "Source": {
810                    "video": self._source._name,
811                    "index": {"ILoc": int(self._idx)},
812                }
813            }
814        else:
815            return {
816                "Source": {
817                    "video": self._source._name,
818                    "index": {"T": [self._idx.numerator, self._idx.denominator]},
819                }
820            }
821
822    def _sources(self):
823        return set([self._source])
824
825    def _filters(self):
826        return {}
827
828
829class _SourceILoc:
830    def __init__(self, source):
831        self._source = source
832
833    def __getitem__(self, idx):
834        if type(idx) is not int:
835            raise Exception(f"Source iloc index must be an integer, got a {type(idx)}")
836        return SourceExpr(self._source, idx, True)
837
838
839def _json_arg(arg, skip_data_anot=False):
840    if type(arg) is FilterExpr or type(arg) is SourceExpr:
841        return {"Frame": arg._to_json_spec()}
842    elif type(arg) is int:
843        if skip_data_anot:
844            return {"Int": arg}
845        return {"Data": {"Int": arg}}
846    elif type(arg) is str:
847        if skip_data_anot:
848            return {"String": arg}
849        return {"Data": {"String": arg}}
850    elif type(arg) is bytes:
851        arg = list(arg)
852        if skip_data_anot:
853            return {"Bytes": arg}
854        return {"Data": {"Bytes": arg}}
855    elif type(arg) is float:
856        if skip_data_anot:
857            return {"Float": arg}
858        return {"Data": {"Float": arg}}
859    elif type(arg) is bool:
860        if skip_data_anot:
861            return {"Bool": arg}
862        return {"Data": {"Bool": arg}}
863    elif type(arg) is tuple or type(arg) is list:
864        if skip_data_anot:
865            return {"List": [_json_arg(x, True) for x in list(arg)]}
866        return {"Data": {"List": [_json_arg(x, True) for x in list(arg)]}}
867    else:
868        raise Exception(f"Unknown arg type: {type(arg)}")
869
870
871class Filter:
872    """A video filter."""
873
874    def __init__(self, func: str):
875        self._func = func
876
877    def __call__(self, *args, **kwargs):
878        return FilterExpr(self, args, kwargs)
879
880
881class FilterExpr:
882    def __init__(self, filter: Filter, args, kwargs):
883        self._filter = filter
884        self._args = args
885        self._kwargs = kwargs
886
887    def __repr__(self):
888        args = []
889        for arg in self._args:
890            val = f'"{arg}"' if type(arg) is str else str(arg)
891            args.append(str(val))
892        for k, v in self._kwargs.items():
893            val = f'"{v}"' if type(v) is str else str(v)
894            args.append(f"{k}={val}")
895        return f"{self._filter._func}({', '.join(args)})"
896
897    def _to_json_spec(self):
898        args = []
899        for arg in self._args:
900            args.append(_json_arg(arg))
901        kwargs = {}
902        for k, v in self._kwargs.items():
903            kwargs[k] = _json_arg(v)
904        return {"Filter": {"name": self._filter._func, "args": args, "kwargs": kwargs}}
905
906    def _sources(self):
907        s = set()
908        for arg in self._args:
909            if type(arg) is FilterExpr or type(arg) is SourceExpr:
910                s = s.union(arg._sources())
911        for arg in self._kwargs.values():
912            if type(arg) is FilterExpr or type(arg) is SourceExpr:
913                s = s.union(arg._sources())
914        return s
915
916    def _filters(self):
917        f = {self._filter._func: self._filter}
918        for arg in self._args:
919            if type(arg) is FilterExpr:
920                f = {**f, **arg._filters()}
921        for arg in self._kwargs.values():
922            if type(arg) is FilterExpr:
923                f = {**f, **arg._filters()}
924        return f
class Source:
399class Source:
400    def __init__(self, id: str, src):
401        self._name = id
402        self._fmt = {
403            "width": src["width"],
404            "height": src["height"],
405            "pix_fmt": src["pix_fmt"],
406        }
407        self._ts = [Fraction(x[0], x[1]) for x in src["ts"]]
408        self.iloc = _SourceILoc(self)
409
410    def id(self) -> str:
411        return self._name
412
413    def fmt(self):
414        return {**self._fmt}
415
416    def ts(self) -> list[Fraction]:
417        return self._ts.copy()
418
419    def __len__(self):
420        return len(self._ts)
421
422    def __getitem__(self, idx):
423        if type(idx) is not Fraction:
424            raise Exception("Source index must be a Fraction")
425        return SourceExpr(self, idx, False)
426
427    def __repr__(self):
428        return f"Source({self._name})"
Source(id: str, src)
400    def __init__(self, id: str, src):
401        self._name = id
402        self._fmt = {
403            "width": src["width"],
404            "height": src["height"],
405            "pix_fmt": src["pix_fmt"],
406        }
407        self._ts = [Fraction(x[0], x[1]) for x in src["ts"]]
408        self.iloc = _SourceILoc(self)
iloc
def id(self) -> str:
410    def id(self) -> str:
411        return self._name
def fmt(self):
413    def fmt(self):
414        return {**self._fmt}
def ts(self) -> list[fractions.Fraction]:
416    def ts(self) -> list[Fraction]:
417        return self._ts.copy()
class Spec:
431class Spec:
432    def __init__(self, id: str, src):
433        self._id = id
434        self._fmt = {
435            "width": src["width"],
436            "height": src["height"],
437            "pix_fmt": src["pix_fmt"],
438        }
439        self._vod_endpoint = src["vod_endpoint"]
440        parsed_url = urlparse(self._vod_endpoint)
441        self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
442
443    def id(self) -> str:
444        return self._id
445
446    def play(self, method):
447        url = f"{self._vod_endpoint}playlist.m3u8"
448        status_url = f"{self._vod_endpoint}status"
449        hls_js_url = self._hls_js_url
450        return _play(self._id, url, hls_js_url, method=method, status_url=status_url)
Spec(id: str, src)
432    def __init__(self, id: str, src):
433        self._id = id
434        self._fmt = {
435            "width": src["width"],
436            "height": src["height"],
437            "pix_fmt": src["pix_fmt"],
438        }
439        self._vod_endpoint = src["vod_endpoint"]
440        parsed_url = urlparse(self._vod_endpoint)
441        self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
def id(self) -> str:
443    def id(self) -> str:
444        return self._id
def play(self, method):
446    def play(self, method):
447        url = f"{self._vod_endpoint}playlist.m3u8"
448        status_url = f"{self._vod_endpoint}status"
449        hls_js_url = self._hls_js_url
450        return _play(self._id, url, hls_js_url, method=method, status_url=status_url)
class Server:
453class Server:
454    def __init__(
455        self, endpoint: str, api_key: str, vod_only=False, cv2_writer_init_callback=None
456    ):
457        if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
458            raise Exception("Endpoint must start with http:// or https://")
459        if endpoint.endswith("/"):
460            raise Exception("Endpoint must not end with /")
461        self._endpoint = endpoint
462
463        self._api_key = api_key
464        self._session = requests.Session()
465        self._session.headers.update({"Authorization": f"Bearer {self._api_key}"})
466        response = self._session.get(
467            f"{self._endpoint}/v2/auth",
468            headers={"Authorization": f"Bearer {self._api_key}"},
469        )
470        if not response.ok:
471            raise Exception(response.text)
472        response = response.json()
473        assert response["status"] == "ok"
474        self._vod_only = vod_only
475        self._cv2_writer_init_callback = cv2_writer_init_callback
476
477    def is_vod_only(self) -> bool:
478        return self._vod_only
479
480    def cv2_writer_init_callback(self):
481        return self._cv2_writer_init_callback
482
483    def get_source(self, id: str) -> Source:
484        assert type(id) is str
485        response = self._session.get(
486            f"{self._endpoint}/v2/source/{id}",
487            headers={"Authorization": f"Bearer {self._api_key}"},
488        )
489        if not response.ok:
490            raise Exception(response.text)
491        response = response.json()
492        return Source(response["id"], response)
493
494    def list_sources(self) -> list[str]:
495        response = self._session.get(
496            f"{self._endpoint}/v2/source",
497            headers={"Authorization": f"Bearer {self._api_key}"},
498        )
499        if not response.ok:
500            raise Exception(response.text)
501        response = response.json()
502        return response
503
504    def delete_source(self, id: str):
505        assert type(id) is str
506        response = self._session.delete(
507            f"{self._endpoint}/v2/source/{id}",
508            headers={"Authorization": f"Bearer {self._api_key}"},
509        )
510        if not response.ok:
511            raise Exception(response.text)
512        response = response.json()
513        assert response["status"] == "ok"
514
515    def search_source(
516        self, name, stream_idx, storage_service, storage_config
517    ) -> list[str]:
518        assert type(name) is str
519        assert type(stream_idx) is int
520        assert type(storage_service) is str
521        assert type(storage_config) is dict
522        for k, v in storage_config.items():
523            assert type(k) is str
524            assert type(v) is str
525        req = {
526            "name": name,
527            "stream_idx": stream_idx,
528            "storage_service": storage_service,
529            "storage_config": storage_config,
530        }
531        response = self._session.post(
532            f"{self._endpoint}/v2/source/search",
533            json=req,
534            headers={"Authorization": f"Bearer {self._api_key}"},
535        )
536        if not response.ok:
537            raise Exception(response.text)
538        response = response.json()
539        return response
540
541    def create_source(
542        self, name, stream_idx, storage_service, storage_config
543    ) -> Source:
544        assert type(name) is str
545        assert type(stream_idx) is int
546        assert type(storage_service) is str
547        assert type(storage_config) is dict
548        for k, v in storage_config.items():
549            assert type(k) is str
550            assert type(v) is str
551        req = {
552            "name": name,
553            "stream_idx": stream_idx,
554            "storage_service": storage_service,
555            "storage_config": storage_config,
556        }
557        response = self._session.post(
558            f"{self._endpoint}/v2/source",
559            json=req,
560            headers={"Authorization": f"Bearer {self._api_key}"},
561        )
562        if not response.ok:
563            raise Exception(response.text)
564        response = response.json()
565        assert response["status"] == "ok"
566        id = response["id"]
567        return self.get_source(id)
568
569    def source(self, name, stream_idx, storage_service, storage_config) -> Source:
570        """Convenience function for accessing sources.
571
572        Tries to find a source with the given name, stream_idx, storage_service, and storage_config.
573        If no source is found, creates a new source with the given parameters.
574        """
575
576        sources = self.search_source(name, stream_idx, storage_service, storage_config)
577        if len(sources) == 0:
578            return self.create_source(name, stream_idx, storage_service, storage_config)
579        return self.get_source(sources[0])
580
581    def get_spec(self, id: str) -> Spec:
582        assert type(id) is str
583        response = self._session.get(
584            f"{self._endpoint}/v2/spec/{id}",
585            headers={"Authorization": f"Bearer {self._api_key}"},
586        )
587        if not response.ok:
588            raise Exception(response.text)
589        response = response.json()
590        return Spec(response["id"], response)
591
592    def list_specs(self) -> list[str]:
593        response = self._session.get(
594            f"{self._endpoint}/v2/spec",
595            headers={"Authorization": f"Bearer {self._api_key}"},
596        )
597        if not response.ok:
598            raise Exception(response.text)
599        response = response.json()
600        return response
601
602    def create_spec(
603        self,
604        width,
605        height,
606        pix_fmt,
607        vod_segment_length,
608        frame_rate,
609        ready_hook=None,
610        steer_hook=None,
611        ttl=None,
612    ) -> Spec:
613        assert type(width) is int
614        assert type(height) is int
615        assert type(pix_fmt) is str
616        assert type(vod_segment_length) is Fraction
617        assert type(frame_rate) is Fraction
618        assert type(ready_hook) is str or ready_hook is None
619        assert type(steer_hook) is str or steer_hook is None
620        assert ttl is None or type(ttl) is int
621
622        req = {
623            "width": width,
624            "height": height,
625            "pix_fmt": pix_fmt,
626            "vod_segment_length": [
627                vod_segment_length.numerator,
628                vod_segment_length.denominator,
629            ],
630            "frame_rate": [frame_rate.numerator, frame_rate.denominator],
631            "ready_hook": ready_hook,
632            "steer_hook": steer_hook,
633            "ttl": ttl,
634        }
635        response = self._session.post(
636            f"{self._endpoint}/v2/spec",
637            json=req,
638            headers={"Authorization": f"Bearer {self._api_key}"},
639        )
640        if not response.ok:
641            raise Exception(response.text)
642        response = response.json()
643        assert response["status"] == "ok"
644        return self.get_spec(response["id"])
645
646    def delete_spec(self, id: str):
647        assert type(id) is str
648        response = self._session.delete(
649            f"{self._endpoint}/v2/spec/{id}",
650            headers={"Authorization": f"Bearer {self._api_key}"},
651        )
652        if not response.ok:
653            raise Exception(response.text)
654        response = response.json()
655        assert response["status"] == "ok"
656
657    def export_spec(
658        self, id: str, path: str, encoder=None, encoder_opts=None, format=None
659    ):
660        assert type(id) is str
661        assert type(path) is str
662        req = {
663            "path": path,
664            "encoder": encoder,
665            "encoder_opts": encoder_opts,
666            "format": format,
667        }
668        response = self._session.post(
669            f"{self._endpoint}/v2/spec/{id}/export",
670            json=req,
671            headers={"Authorization": f"Bearer {self._api_key}"},
672        )
673        if not response.ok:
674            raise Exception(response.text)
675        response = response.json()
676        assert response["status"] == "ok"
677
678    def push_spec_part(self, spec_id, pos, frames, terminal):
679        if type(spec_id) is Spec:
680            spec_id = spec_id._id
681        assert type(spec_id) is str
682        assert type(pos) is int
683        assert type(frames) is list
684        assert type(terminal) is bool
685
686        req_frames = []
687        for frame in frames:
688            assert type(frame) is tuple
689            assert len(frame) == 2
690            t = frame[0]
691            f = frame[1]
692            assert type(t) is Fraction
693            assert f is None or type(f) is SourceExpr or type(f) is FilterExpr
694            req_frames.append(
695                [
696                    [t.numerator, t.denominator],
697                    f._to_json_spec() if f is not None else None,
698                ]
699            )
700
701        req = {
702            "pos": pos,
703            "frames": req_frames,
704            "terminal": terminal,
705        }
706        response = self._session.post(
707            f"{self._endpoint}/v2/spec/{spec_id}/part",
708            json=req,
709            headers={"Authorization": f"Bearer {self._api_key}"},
710        )
711        if not response.ok:
712            raise Exception(response.text)
713        response = response.json()
714        assert response["status"] == "ok"
715
716    def push_spec_part_block(
717        self, spec_id: str, pos, blocks, terminal, compression="gzip"
718    ):
719        if type(spec_id) is Spec:
720            spec_id = spec_id._id
721        assert type(spec_id) is str
722        assert type(pos) is int
723        assert type(blocks) is list
724        assert type(terminal) is bool
725        assert compression is None or compression == "gzip"
726
727        req_blocks = []
728        for block in blocks:
729            assert type(block) is _FrameExpressionBlock
730            block_body = block.as_dict()
731            block_frames = len(block_body["frame_exprs"])
732            block_body = json.dumps(block_body).encode("utf-8")
733            if compression == "gzip":
734                block_body = gzip.compress(block_body, 1)
735            block_body = base64.b64encode(block_body).decode("utf-8")
736            req_blocks.append(
737                {
738                    "frames": block_frames,
739                    "compression": compression,
740                    "body": block_body,
741                }
742            )
743
744        req = {
745            "pos": pos,
746            "terminal": terminal,
747            "blocks": req_blocks,
748        }
749        response = self._session.post(
750            f"{self._endpoint}/v2/spec/{spec_id}/part_block",
751            json=req,
752            headers={"Authorization": f"Bearer {self._api_key}"},
753        )
754        if not response.ok:
755            raise Exception(response.text)
756        response = response.json()
757        assert response["status"] == "ok"
758
759    def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"):
760        assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr
761        assert compression is None or compression in ["gzip"]
762        feb = _FrameExpressionBlock()
763        feb.insert_frame(frame_expr)
764        feb_body = feb.as_dict()
765
766        feb_body = json.dumps(feb_body).encode("utf-8")
767        if compression == "gzip":
768            feb_body = gzip.compress(feb_body, 1)
769        feb_body = base64.b64encode(feb_body).decode("utf-8")
770        req = {
771            "width": width,
772            "height": height,
773            "pix_fmt": pix_fmt,
774            "compression": compression,
775            "block": {
776                "frames": 1,
777                "compression": compression,
778                "body": feb_body,
779            },
780        }
781        response = self._session.post(
782            f"{self._endpoint}/v2/frame",
783            json=req,
784            headers={"Authorization": f"Bearer {self._api_key}"},
785        )
786        if not response.ok:
787            raise Exception(response.text)
788        response_body = response.content
789        assert type(response_body) is bytes
790        if compression == "gzip":
791            response_body = gzip.decompress(response_body)
792        return response_body
Server( endpoint: str, api_key: str, vod_only=False, cv2_writer_init_callback=None)
454    def __init__(
455        self, endpoint: str, api_key: str, vod_only=False, cv2_writer_init_callback=None
456    ):
457        if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
458            raise Exception("Endpoint must start with http:// or https://")
459        if endpoint.endswith("/"):
460            raise Exception("Endpoint must not end with /")
461        self._endpoint = endpoint
462
463        self._api_key = api_key
464        self._session = requests.Session()
465        self._session.headers.update({"Authorization": f"Bearer {self._api_key}"})
466        response = self._session.get(
467            f"{self._endpoint}/v2/auth",
468            headers={"Authorization": f"Bearer {self._api_key}"},
469        )
470        if not response.ok:
471            raise Exception(response.text)
472        response = response.json()
473        assert response["status"] == "ok"
474        self._vod_only = vod_only
475        self._cv2_writer_init_callback = cv2_writer_init_callback
def is_vod_only(self) -> bool:
477    def is_vod_only(self) -> bool:
478        return self._vod_only
def cv2_writer_init_callback(self):
480    def cv2_writer_init_callback(self):
481        return self._cv2_writer_init_callback
def get_source(self, id: str) -> Source:
483    def get_source(self, id: str) -> Source:
484        assert type(id) is str
485        response = self._session.get(
486            f"{self._endpoint}/v2/source/{id}",
487            headers={"Authorization": f"Bearer {self._api_key}"},
488        )
489        if not response.ok:
490            raise Exception(response.text)
491        response = response.json()
492        return Source(response["id"], response)
def list_sources(self) -> list[str]:
494    def list_sources(self) -> list[str]:
495        response = self._session.get(
496            f"{self._endpoint}/v2/source",
497            headers={"Authorization": f"Bearer {self._api_key}"},
498        )
499        if not response.ok:
500            raise Exception(response.text)
501        response = response.json()
502        return response
def delete_source(self, id: str):
504    def delete_source(self, id: str):
505        assert type(id) is str
506        response = self._session.delete(
507            f"{self._endpoint}/v2/source/{id}",
508            headers={"Authorization": f"Bearer {self._api_key}"},
509        )
510        if not response.ok:
511            raise Exception(response.text)
512        response = response.json()
513        assert response["status"] == "ok"
def search_source(self, name, stream_idx, storage_service, storage_config) -> list[str]:
515    def search_source(
516        self, name, stream_idx, storage_service, storage_config
517    ) -> list[str]:
518        assert type(name) is str
519        assert type(stream_idx) is int
520        assert type(storage_service) is str
521        assert type(storage_config) is dict
522        for k, v in storage_config.items():
523            assert type(k) is str
524            assert type(v) is str
525        req = {
526            "name": name,
527            "stream_idx": stream_idx,
528            "storage_service": storage_service,
529            "storage_config": storage_config,
530        }
531        response = self._session.post(
532            f"{self._endpoint}/v2/source/search",
533            json=req,
534            headers={"Authorization": f"Bearer {self._api_key}"},
535        )
536        if not response.ok:
537            raise Exception(response.text)
538        response = response.json()
539        return response
def create_source( self, name, stream_idx, storage_service, storage_config) -> Source:
541    def create_source(
542        self, name, stream_idx, storage_service, storage_config
543    ) -> Source:
544        assert type(name) is str
545        assert type(stream_idx) is int
546        assert type(storage_service) is str
547        assert type(storage_config) is dict
548        for k, v in storage_config.items():
549            assert type(k) is str
550            assert type(v) is str
551        req = {
552            "name": name,
553            "stream_idx": stream_idx,
554            "storage_service": storage_service,
555            "storage_config": storage_config,
556        }
557        response = self._session.post(
558            f"{self._endpoint}/v2/source",
559            json=req,
560            headers={"Authorization": f"Bearer {self._api_key}"},
561        )
562        if not response.ok:
563            raise Exception(response.text)
564        response = response.json()
565        assert response["status"] == "ok"
566        id = response["id"]
567        return self.get_source(id)
def source( self, name, stream_idx, storage_service, storage_config) -> Source:
569    def source(self, name, stream_idx, storage_service, storage_config) -> Source:
570        """Convenience function for accessing sources.
571
572        Tries to find a source with the given name, stream_idx, storage_service, and storage_config.
573        If no source is found, creates a new source with the given parameters.
574        """
575
576        sources = self.search_source(name, stream_idx, storage_service, storage_config)
577        if len(sources) == 0:
578            return self.create_source(name, stream_idx, storage_service, storage_config)
579        return self.get_source(sources[0])

Convenience function for accessing sources.

Tries to find a source with the given name, stream_idx, storage_service, and storage_config. If no source is found, creates a new source with the given parameters.

def get_spec(self, id: str) -> Spec:
581    def get_spec(self, id: str) -> Spec:
582        assert type(id) is str
583        response = self._session.get(
584            f"{self._endpoint}/v2/spec/{id}",
585            headers={"Authorization": f"Bearer {self._api_key}"},
586        )
587        if not response.ok:
588            raise Exception(response.text)
589        response = response.json()
590        return Spec(response["id"], response)
def list_specs(self) -> list[str]:
592    def list_specs(self) -> list[str]:
593        response = self._session.get(
594            f"{self._endpoint}/v2/spec",
595            headers={"Authorization": f"Bearer {self._api_key}"},
596        )
597        if not response.ok:
598            raise Exception(response.text)
599        response = response.json()
600        return response
def create_spec( self, width, height, pix_fmt, vod_segment_length, frame_rate, ready_hook=None, steer_hook=None, ttl=None) -> Spec:
602    def create_spec(
603        self,
604        width,
605        height,
606        pix_fmt,
607        vod_segment_length,
608        frame_rate,
609        ready_hook=None,
610        steer_hook=None,
611        ttl=None,
612    ) -> Spec:
613        assert type(width) is int
614        assert type(height) is int
615        assert type(pix_fmt) is str
616        assert type(vod_segment_length) is Fraction
617        assert type(frame_rate) is Fraction
618        assert type(ready_hook) is str or ready_hook is None
619        assert type(steer_hook) is str or steer_hook is None
620        assert ttl is None or type(ttl) is int
621
622        req = {
623            "width": width,
624            "height": height,
625            "pix_fmt": pix_fmt,
626            "vod_segment_length": [
627                vod_segment_length.numerator,
628                vod_segment_length.denominator,
629            ],
630            "frame_rate": [frame_rate.numerator, frame_rate.denominator],
631            "ready_hook": ready_hook,
632            "steer_hook": steer_hook,
633            "ttl": ttl,
634        }
635        response = self._session.post(
636            f"{self._endpoint}/v2/spec",
637            json=req,
638            headers={"Authorization": f"Bearer {self._api_key}"},
639        )
640        if not response.ok:
641            raise Exception(response.text)
642        response = response.json()
643        assert response["status"] == "ok"
644        return self.get_spec(response["id"])
def delete_spec(self, id: str):
646    def delete_spec(self, id: str):
647        assert type(id) is str
648        response = self._session.delete(
649            f"{self._endpoint}/v2/spec/{id}",
650            headers={"Authorization": f"Bearer {self._api_key}"},
651        )
652        if not response.ok:
653            raise Exception(response.text)
654        response = response.json()
655        assert response["status"] == "ok"
def export_spec( self, id: str, path: str, encoder=None, encoder_opts=None, format=None):
657    def export_spec(
658        self, id: str, path: str, encoder=None, encoder_opts=None, format=None
659    ):
660        assert type(id) is str
661        assert type(path) is str
662        req = {
663            "path": path,
664            "encoder": encoder,
665            "encoder_opts": encoder_opts,
666            "format": format,
667        }
668        response = self._session.post(
669            f"{self._endpoint}/v2/spec/{id}/export",
670            json=req,
671            headers={"Authorization": f"Bearer {self._api_key}"},
672        )
673        if not response.ok:
674            raise Exception(response.text)
675        response = response.json()
676        assert response["status"] == "ok"
def push_spec_part(self, spec_id, pos, frames, terminal):
678    def push_spec_part(self, spec_id, pos, frames, terminal):
679        if type(spec_id) is Spec:
680            spec_id = spec_id._id
681        assert type(spec_id) is str
682        assert type(pos) is int
683        assert type(frames) is list
684        assert type(terminal) is bool
685
686        req_frames = []
687        for frame in frames:
688            assert type(frame) is tuple
689            assert len(frame) == 2
690            t = frame[0]
691            f = frame[1]
692            assert type(t) is Fraction
693            assert f is None or type(f) is SourceExpr or type(f) is FilterExpr
694            req_frames.append(
695                [
696                    [t.numerator, t.denominator],
697                    f._to_json_spec() if f is not None else None,
698                ]
699            )
700
701        req = {
702            "pos": pos,
703            "frames": req_frames,
704            "terminal": terminal,
705        }
706        response = self._session.post(
707            f"{self._endpoint}/v2/spec/{spec_id}/part",
708            json=req,
709            headers={"Authorization": f"Bearer {self._api_key}"},
710        )
711        if not response.ok:
712            raise Exception(response.text)
713        response = response.json()
714        assert response["status"] == "ok"
def push_spec_part_block(self, spec_id: str, pos, blocks, terminal, compression='gzip'):
716    def push_spec_part_block(
717        self, spec_id: str, pos, blocks, terminal, compression="gzip"
718    ):
719        if type(spec_id) is Spec:
720            spec_id = spec_id._id
721        assert type(spec_id) is str
722        assert type(pos) is int
723        assert type(blocks) is list
724        assert type(terminal) is bool
725        assert compression is None or compression == "gzip"
726
727        req_blocks = []
728        for block in blocks:
729            assert type(block) is _FrameExpressionBlock
730            block_body = block.as_dict()
731            block_frames = len(block_body["frame_exprs"])
732            block_body = json.dumps(block_body).encode("utf-8")
733            if compression == "gzip":
734                block_body = gzip.compress(block_body, 1)
735            block_body = base64.b64encode(block_body).decode("utf-8")
736            req_blocks.append(
737                {
738                    "frames": block_frames,
739                    "compression": compression,
740                    "body": block_body,
741                }
742            )
743
744        req = {
745            "pos": pos,
746            "terminal": terminal,
747            "blocks": req_blocks,
748        }
749        response = self._session.post(
750            f"{self._endpoint}/v2/spec/{spec_id}/part_block",
751            json=req,
752            headers={"Authorization": f"Bearer {self._api_key}"},
753        )
754        if not response.ok:
755            raise Exception(response.text)
756        response = response.json()
757        assert response["status"] == "ok"
def frame(self, width, height, pix_fmt, frame_expr, compression='gzip'):
759    def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"):
760        assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr
761        assert compression is None or compression in ["gzip"]
762        feb = _FrameExpressionBlock()
763        feb.insert_frame(frame_expr)
764        feb_body = feb.as_dict()
765
766        feb_body = json.dumps(feb_body).encode("utf-8")
767        if compression == "gzip":
768            feb_body = gzip.compress(feb_body, 1)
769        feb_body = base64.b64encode(feb_body).decode("utf-8")
770        req = {
771            "width": width,
772            "height": height,
773            "pix_fmt": pix_fmt,
774            "compression": compression,
775            "block": {
776                "frames": 1,
777                "compression": compression,
778                "body": feb_body,
779            },
780        }
781        response = self._session.post(
782            f"{self._endpoint}/v2/frame",
783            json=req,
784            headers={"Authorization": f"Bearer {self._api_key}"},
785        )
786        if not response.ok:
787            raise Exception(response.text)
788        response_body = response.content
789        assert type(response_body) is bytes
790        if compression == "gzip":
791            response_body = gzip.decompress(response_body)
792        return response_body
class SourceExpr:
795class SourceExpr:
796    def __init__(self, source, idx, is_iloc):
797        self._source = source
798        self._idx = idx
799        self._is_iloc = is_iloc
800
801    def __repr__(self):
802        if self._is_iloc:
803            return f"{self._source._name}.iloc[{self._idx}]"
804        else:
805            return f"{self._source._name}[{self._idx}]"
806
807    def _to_json_spec(self):
808        if self._is_iloc:
809            return {
810                "Source": {
811                    "video": self._source._name,
812                    "index": {"ILoc": int(self._idx)},
813                }
814            }
815        else:
816            return {
817                "Source": {
818                    "video": self._source._name,
819                    "index": {"T": [self._idx.numerator, self._idx.denominator]},
820                }
821            }
822
823    def _sources(self):
824        return set([self._source])
825
826    def _filters(self):
827        return {}
SourceExpr(source, idx, is_iloc)
796    def __init__(self, source, idx, is_iloc):
797        self._source = source
798        self._idx = idx
799        self._is_iloc = is_iloc
class Filter:
872class Filter:
873    """A video filter."""
874
875    def __init__(self, func: str):
876        self._func = func
877
878    def __call__(self, *args, **kwargs):
879        return FilterExpr(self, args, kwargs)

A video filter.

Filter(func: str)
875    def __init__(self, func: str):
876        self._func = func
class FilterExpr:
882class FilterExpr:
883    def __init__(self, filter: Filter, args, kwargs):
884        self._filter = filter
885        self._args = args
886        self._kwargs = kwargs
887
888    def __repr__(self):
889        args = []
890        for arg in self._args:
891            val = f'"{arg}"' if type(arg) is str else str(arg)
892            args.append(str(val))
893        for k, v in self._kwargs.items():
894            val = f'"{v}"' if type(v) is str else str(v)
895            args.append(f"{k}={val}")
896        return f"{self._filter._func}({', '.join(args)})"
897
898    def _to_json_spec(self):
899        args = []
900        for arg in self._args:
901            args.append(_json_arg(arg))
902        kwargs = {}
903        for k, v in self._kwargs.items():
904            kwargs[k] = _json_arg(v)
905        return {"Filter": {"name": self._filter._func, "args": args, "kwargs": kwargs}}
906
907    def _sources(self):
908        s = set()
909        for arg in self._args:
910            if type(arg) is FilterExpr or type(arg) is SourceExpr:
911                s = s.union(arg._sources())
912        for arg in self._kwargs.values():
913            if type(arg) is FilterExpr or type(arg) is SourceExpr:
914                s = s.union(arg._sources())
915        return s
916
917    def _filters(self):
918        f = {self._filter._func: self._filter}
919        for arg in self._args:
920            if type(arg) is FilterExpr:
921                f = {**f, **arg._filters()}
922        for arg in self._kwargs.values():
923            if type(arg) is FilterExpr:
924                f = {**f, **arg._filters()}
925        return f
FilterExpr(filter: Filter, args, kwargs)
883    def __init__(self, filter: Filter, args, kwargs):
884        self._filter = filter
885        self._args = args
886        self._kwargs = kwargs