vidformer

  1"""
  2vidformer-py is a Python 🐍 interface for [vidformer](https://github.com/ixlab/vidformer).
  3
  4**Quick links:**
  5* [📦 PyPI](https://pypi.org/project/vidformer/)
  6* [📘 Documentation - vidformer-py](https://ixlab.github.io/vidformer/vidformer-py/pdoc/)
  7* [📘 Documentation - vidformer.cv2](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/cv2.html)
  8* [📘 Documentation - vidformer.supervision](https://ixlab.github.io/vidformer/vidformer-py/pdoc/vidformer/supervision.html)
  9* [🧑‍💻 Source Code](https://github.com/ixlab/vidformer/tree/main/vidformer-py/)
 10"""
 11
 12__version__ = "1.0.1"
 13
 14
 15import base64
 16import gzip
 17import json
 18import struct
 19import time
 20from fractions import Fraction
 21from urllib.parse import urlparse
 22
 23import requests
 24
 25_in_notebook = False
 26try:
 27    from IPython import get_ipython
 28
 29    if "IPKernelApp" in get_ipython().config:
 30        _in_notebook = True
 31except Exception:
 32    pass
 33
 34
 35def _wait_for_url(url, max_attempts=150, delay=0.1):
 36    for attempt in range(max_attempts):
 37        try:
 38            response = requests.get(url)
 39            if response.status_code == 200:
 40                return response.text.strip()
 41            else:
 42                time.sleep(delay)
 43        except requests.exceptions.RequestException:
 44            time.sleep(delay)
 45    return None
 46
 47
 48def _play(namespace, hls_video_url, hls_js_url, method="html", status_url=None):
 49    # The namespace is so multiple videos in one tab don't conflict
 50
 51    if method == "html":
 52        from IPython.display import HTML
 53
 54        if not status_url:
 55            html_code = f"""
 56<!DOCTYPE html>
 57<html>
 58<head>
 59    <title>HLS Video Player</title>
 60    <!-- Include hls.js library -->
 61    <script src="{hls_js_url}"></script>
 62</head>
 63<body>
 64    <video id="video-{namespace}" controls width="640" height="360" autoplay></video>
 65    <script>
 66        var video = document.getElementById('video-{namespace}');
 67        var videoSrc = '{hls_video_url}';
 68
 69        if (Hls.isSupported()) {{
 70            var hls = new Hls();
 71            hls.loadSource(videoSrc);
 72            hls.attachMedia(video);
 73            hls.on(Hls.Events.MANIFEST_PARSED, function() {{
 74                video.play();
 75            }});
 76        }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{
 77            video.src = videoSrc;
 78            video.addEventListener('loadedmetadata', function() {{
 79                video.play();
 80            }});
 81        }} else {{
 82            console.error('This browser does not appear to support HLS.');
 83        }}
 84    </script>
 85</body>
 86</html>
 87"""
 88            return HTML(data=html_code)
 89        else:
 90            html_code = f"""
 91<!DOCTYPE html>
 92<html>
 93<head>
 94    <title>HLS Video Player</title>
 95    <script src="{hls_js_url}"></script>
 96</head>
 97<body>
 98    <div id="container-{namespace}"></div>
 99    <script>
100        var statusUrl = '{status_url}';
101        var videoSrc = '{hls_video_url}';
102        var videoNamespace = '{namespace}';
103
104        function showWaiting() {{
105            document.getElementById('container-{namespace}').textContent = 'Waiting...';
106            pollStatus();
107        }}
108
109        function pollStatus() {{
110            setTimeout(function() {{
111                fetch(statusUrl)
112                    .then(r => r.json())
113                    .then(res => {{
114                        if (res.ready) {{
115                            document.getElementById('container-{namespace}').textContent = '';
116                            attachHls();
117                        }} else {{
118                            pollStatus();
119                        }}
120                    }})
121                    .catch(e => {{
122                        console.error(e);
123                        pollStatus();
124                    }});
125            }}, 250);
126        }}
127
128        function attachHls() {{
129            var container = document.getElementById('container-{namespace}');
130            container.textContent = '';
131            var video = document.createElement('video');
132            video.id = 'video-' + videoNamespace;
133            video.controls = true;
134            video.width = 640;
135            video.height = 360;
136            container.appendChild(video);
137            if (Hls.isSupported()) {{
138                var hls = new Hls();
139                hls.loadSource(videoSrc);
140                hls.attachMedia(video);
141                hls.on(Hls.Events.MANIFEST_PARSED, function() {{
142                    video.play();
143                }});
144            }} else if (video.canPlayType('application/vnd.apple.mpegurl')) {{
145                video.src = videoSrc;
146                video.addEventListener('loadedmetadata', function() {{
147                    video.play();
148                }});
149            }}
150        }}
151
152        fetch(statusUrl)
153            .then(r => r.json())
154            .then(res => {{
155                if (res.ready) {{
156                    attachHls();
157                }} else {{
158                    showWaiting();
159                }}
160            }})
161            .catch(e => {{
162                console.error(e);
163                showWaiting();
164            }});
165    </script>
166</body>
167</html>
168"""
169        return HTML(data=html_code)
170    elif method == "link":
171        return hls_video_url
172    else:
173        raise ValueError("Invalid method")
174
175
176def _feb_expr_coded_as_scalar(expr) -> bool:
177    if type(expr) is tuple:
178        expr = list(expr)
179    if type(expr) is FilterExpr:
180        return False
181    if type(expr) is list:
182        if len(expr) > 3:
183            return False
184        else:
185            return all([type(x) is int and x >= -(2**15) and x < 2**15 for x in expr])
186    else:
187        assert type(expr) in [int, float, str, bytes, SourceExpr, bool, list]
188        return True
189
190
191class _FrameExpressionBlock:
192    def __init__(self):
193        self._functions = []
194        self._literals = []
195        self._sources = []
196        self._kwarg_keys = []
197        self._source_fracs = []
198        self._exprs = []
199        self._frame_exprs = []
200
201    def __len__(self):
202        return len(self._frame_exprs)
203
204    def insert_expr(self, expr):
205        if type(expr) is SourceExpr or type(expr) is FilterExpr:
206            return self.insert_frame_expr(expr)
207        else:
208            return self.insert_data_expr(expr)
209
210    def insert_data_expr(self, data):
211        if type(data) is tuple:
212            data = list(data)
213        if type(data) is bool:
214            self._exprs.append(0x01000000_00000000 | int(data))
215            return len(self._exprs) - 1
216        elif type(data) is int:
217            if data >= -(2**31) and data < 2**31:
218                self._exprs.append(data & 0xFFFFFFFF)
219            else:
220                self._literals.append(_json_arg(data, skip_data_anot=True))
221                self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
222            return len(self._exprs) - 1
223        elif type(data) is float:
224            self._exprs.append(
225                0x02000000_00000000 | int.from_bytes(struct.pack("f", data)[::-1])
226            )
227        elif type(data) is str:
228            self._literals.append(_json_arg(data, skip_data_anot=True))
229            self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
230        elif type(data) is bytes:
231            self._literals.append(_json_arg(data, skip_data_anot=True))
232            self._exprs.append(0x40000000_00000000 | len(self._literals) - 1)
233        elif type(data) is list:
234            if len(data) == 0:
235                self._exprs.append(0x03000000_00000000)
236                return len(self._exprs) - 1
237            if (
238                len(data) == 1
239                and type(data[0]) is int
240                and data[0] >= -(2**15)
241                and data[0] < 2**15
242            ):
243                self._exprs.append(0x04000000_00000000 | (data[0] & 0xFFFF))
244                return len(self._exprs) - 1
245            if (
246                len(data) == 2
247                and type(data[0]) is int
248                and data[0] >= -(2**15)
249                and data[0] < 2**15
250                and type(data[1]) is int
251                and data[1] >= -(2**15)
252                and data[1] < 2**15
253            ):
254                self._exprs.append(
255                    0x05000000_00000000
256                    | ((data[0] & 0xFFFF) << 16)
257                    | (data[1] & 0xFFFF)
258                )
259                return len(self._exprs) - 1
260            if (
261                len(data) == 3
262                and type(data[0]) is int
263                and data[0] >= -(2**15)
264                and data[0] < 2**15
265                and type(data[1]) is int
266                and data[1] >= -(2**15)
267                and data[1] < 2**15
268                and type(data[2]) is int
269                and data[2] >= -(2**15)
270                and data[2] < 2**15
271            ):
272                self._exprs.append(
273                    0x06000000_00000000
274                    | ((data[0] & 0xFFFF) << 32)
275                    | ((data[1] & 0xFFFF) << 16)
276                    | (data[2] & 0xFFFF)
277                )
278                return len(self._exprs) - 1
279            out = len(self._exprs)
280            member_idxs = []
281            for member in data:
282                if _feb_expr_coded_as_scalar(member):
283                    member_idxs.append(None)
284                else:
285                    member_idxs.append(self.insert_data_expr(member))
286
287            self._exprs.append(0x42000000_00000000 | len(data))
288
289            for i in range(len(data)):
290                if member_idxs[i] is None:
291                    self.insert_data_expr(data[i])
292                else:
293                    self._exprs.append(0x45000000_00000000 | member_idxs[i])
294
295            return out
296        else:
297            raise Exception("Invalid data type")
298
299    def insert_frame_expr(self, frame):
300        if type(frame) is SourceExpr:
301            source = frame._source._name
302            if source in self._sources:
303                source_idx = self._sources.index(source)
304            else:
305                source_idx = len(self._sources)
306                self._sources.append(source)
307            if frame._is_iloc:
308                self._exprs.append(
309                    0x43000000_00000000 | (source_idx << 32) | frame._idx
310                )
311            else:
312                idx = len(self._source_fracs) // 2
313                self._source_fracs.append(frame._idx.numerator)
314                self._source_fracs.append(frame._idx.denominator)
315                self._exprs.append(0x44000000_00000000 | (source_idx << 32) | idx)
316            return len(self._exprs) - 1
317        elif type(frame) is FilterExpr:
318            func = frame._filter._func
319            if func in self._functions:
320                func_idx = self._functions.index(func)
321            else:
322                func_idx = len(self._functions)
323                self._functions.append(func)
324            len_args = len(frame._args)
325            len_kwargs = len(frame._kwargs)
326
327            arg_idxs = []
328            for arg in frame._args:
329                if _feb_expr_coded_as_scalar(arg):
330                    arg_idxs.append(None)
331                else:
332                    arg_idxs.append(self.insert_expr(arg))
333            kwarg_idxs = {}
334            for k, v in frame._kwargs.items():
335                if _feb_expr_coded_as_scalar(v):
336                    kwarg_idxs[k] = None
337                else:
338                    kwarg_idxs[k] = self.insert_expr(v)
339
340            out_idx = len(self._exprs)
341            self._exprs.append(
342                0x41000000_00000000 | (len_args << 24) | (len_kwargs << 16) | func_idx
343            )
344            for i in range(len_args):
345                if arg_idxs[i] is None:
346                    # It's a scalar
347                    self.insert_expr(frame._args[i])
348                else:
349                    # It's an expression pointer
350                    self._exprs.append(0x45000000_00000000 | arg_idxs[i])
351            for k, v in frame._kwargs.items():
352                if k in self._kwarg_keys:
353                    k_idx = self._kwarg_keys.index(k)
354                else:
355                    k_idx = len(self._kwarg_keys)
356                    self._kwarg_keys.append(k)
357                self._exprs.append(0x46000000_00000000 | k_idx)
358                if kwarg_idxs[k] is None:
359                    # It's a scalar
360                    self.insert_expr(v)
361                else:
362                    # It's an expression pointer
363                    self._exprs.append(0x45000000_00000000 | kwarg_idxs[k])
364            return out_idx
365        else:
366            raise Exception("Invalid frame type")
367
368    def insert_frame(self, frame):
369        idx = self.insert_frame_expr(frame)
370        self._frame_exprs.append(idx)
371
372    def as_dict(self):
373        return {
374            "functions": self._functions,
375            "literals": self._literals,
376            "sources": self._sources,
377            "kwarg_keys": self._kwarg_keys,
378            "source_fracs": self._source_fracs,
379            "exprs": self._exprs,
380            "frame_exprs": self._frame_exprs,
381        }
382
383
384class Source:
385    def __init__(self, id: str, src):
386        self._name = id
387        self._fmt = {
388            "width": src["width"],
389            "height": src["height"],
390            "pix_fmt": src["pix_fmt"],
391        }
392        self._ts = [Fraction(x[0], x[1]) for x in src["ts"]]
393        self.iloc = _SourceILoc(self)
394
395    def id(self) -> str:
396        return self._name
397
398    def fmt(self):
399        return {**self._fmt}
400
401    def ts(self) -> list[Fraction]:
402        return self._ts.copy()
403
404    def __len__(self):
405        return len(self._ts)
406
407    def __getitem__(self, idx):
408        if type(idx) is not Fraction:
409            raise Exception("Source index must be a Fraction")
410        return SourceExpr(self, idx, False)
411
412    def __repr__(self):
413        return f"Source({self._name})"
414
415
416class Spec:
417    def __init__(self, id: str, src):
418        self._id = id
419        self._fmt = {
420            "width": src["width"],
421            "height": src["height"],
422            "pix_fmt": src["pix_fmt"],
423        }
424        self._vod_endpoint = src["vod_endpoint"]
425        parsed_url = urlparse(self._vod_endpoint)
426        self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
427
428    def id(self) -> str:
429        return self._id
430
431    def play(self, method):
432        url = f"{self._vod_endpoint}playlist.m3u8"
433        status_url = f"{self._vod_endpoint}status"
434        hls_js_url = self._hls_js_url
435        return _play(self._id, url, hls_js_url, method=method, status_url=status_url)
436
437
438class Server:
439    def __init__(self, endpoint: str, api_key: str):
440        if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
441            raise Exception("Endpoint must start with http:// or https://")
442        if endpoint.endswith("/"):
443            raise Exception("Endpoint must not end with /")
444        self._endpoint = endpoint
445
446        self._api_key = api_key
447        self._session = requests.Session()
448        self._session.headers.update({"Authorization": f"Bearer {self._api_key}"})
449        response = self._session.get(
450            f"{self._endpoint}/v2/auth",
451            headers={"Authorization": f"Bearer {self._api_key}"},
452        )
453        if not response.ok:
454            raise Exception(response.text)
455        response = response.json()
456        assert response["status"] == "ok"
457
458    def get_source(self, id: str) -> Source:
459        assert type(id) is str
460        response = self._session.get(
461            f"{self._endpoint}/v2/source/{id}",
462            headers={"Authorization": f"Bearer {self._api_key}"},
463        )
464        if not response.ok:
465            raise Exception(response.text)
466        response = response.json()
467        return Source(response["id"], response)
468
469    def list_sources(self) -> list[str]:
470        response = self._session.get(
471            f"{self._endpoint}/v2/source",
472            headers={"Authorization": f"Bearer {self._api_key}"},
473        )
474        if not response.ok:
475            raise Exception(response.text)
476        response = response.json()
477        return response
478
479    def delete_source(self, id: str):
480        assert type(id) is str
481        response = self._session.delete(
482            f"{self._endpoint}/v2/source/{id}",
483            headers={"Authorization": f"Bearer {self._api_key}"},
484        )
485        if not response.ok:
486            raise Exception(response.text)
487        response = response.json()
488        assert response["status"] == "ok"
489
490    def search_source(
491        self, name, stream_idx, storage_service, storage_config
492    ) -> list[str]:
493        assert type(name) is str
494        assert type(stream_idx) is int
495        assert type(storage_service) is str
496        assert type(storage_config) is dict
497        for k, v in storage_config.items():
498            assert type(k) is str
499            assert type(v) is str
500        req = {
501            "name": name,
502            "stream_idx": stream_idx,
503            "storage_service": storage_service,
504            "storage_config": storage_config,
505        }
506        response = self._session.post(
507            f"{self._endpoint}/v2/source/search",
508            json=req,
509            headers={"Authorization": f"Bearer {self._api_key}"},
510        )
511        if not response.ok:
512            raise Exception(response.text)
513        response = response.json()
514        return response
515
516    def create_source(
517        self, name, stream_idx, storage_service, storage_config
518    ) -> Source:
519        assert type(name) is str
520        assert type(stream_idx) is int
521        assert type(storage_service) is str
522        assert type(storage_config) is dict
523        for k, v in storage_config.items():
524            assert type(k) is str
525            assert type(v) is str
526        req = {
527            "name": name,
528            "stream_idx": stream_idx,
529            "storage_service": storage_service,
530            "storage_config": storage_config,
531        }
532        response = self._session.post(
533            f"{self._endpoint}/v2/source",
534            json=req,
535            headers={"Authorization": f"Bearer {self._api_key}"},
536        )
537        if not response.ok:
538            raise Exception(response.text)
539        response = response.json()
540        assert response["status"] == "ok"
541        id = response["id"]
542        return self.get_source(id)
543
544    def source(self, name, stream_idx, storage_service, storage_config) -> Source:
545        """Convenience function for accessing sources.
546
547        Tries to find a source with the given name, stream_idx, storage_service, and storage_config.
548        If no source is found, creates a new source with the given parameters.
549        """
550
551        sources = self.search_source(name, stream_idx, storage_service, storage_config)
552        if len(sources) == 0:
553            return self.create_source(name, stream_idx, storage_service, storage_config)
554        return self.get_source(sources[0])
555
556    def get_spec(self, id: str) -> Spec:
557        assert type(id) is str
558        response = self._session.get(
559            f"{self._endpoint}/v2/spec/{id}",
560            headers={"Authorization": f"Bearer {self._api_key}"},
561        )
562        if not response.ok:
563            raise Exception(response.text)
564        response = response.json()
565        return Spec(response["id"], response)
566
567    def list_specs(self) -> list[str]:
568        response = self._session.get(
569            f"{self._endpoint}/v2/spec",
570            headers={"Authorization": f"Bearer {self._api_key}"},
571        )
572        if not response.ok:
573            raise Exception(response.text)
574        response = response.json()
575        return response
576
577    def create_spec(
578        self,
579        width,
580        height,
581        pix_fmt,
582        vod_segment_length,
583        frame_rate,
584        ready_hook=None,
585        steer_hook=None,
586        ttl=None,
587    ) -> Spec:
588        assert type(width) is int
589        assert type(height) is int
590        assert type(pix_fmt) is str
591        assert type(vod_segment_length) is Fraction
592        assert type(frame_rate) is Fraction
593        assert type(ready_hook) is str or ready_hook is None
594        assert type(steer_hook) is str or steer_hook is None
595        assert ttl is None or type(ttl) is int
596
597        req = {
598            "width": width,
599            "height": height,
600            "pix_fmt": pix_fmt,
601            "vod_segment_length": [
602                vod_segment_length.numerator,
603                vod_segment_length.denominator,
604            ],
605            "frame_rate": [frame_rate.numerator, frame_rate.denominator],
606            "ready_hook": ready_hook,
607            "steer_hook": steer_hook,
608            "ttl": ttl,
609        }
610        response = self._session.post(
611            f"{self._endpoint}/v2/spec",
612            json=req,
613            headers={"Authorization": f"Bearer {self._api_key}"},
614        )
615        if not response.ok:
616            raise Exception(response.text)
617        response = response.json()
618        assert response["status"] == "ok"
619        return self.get_spec(response["id"])
620
621    def delete_spec(self, id: str):
622        assert type(id) is str
623        response = self._session.delete(
624            f"{self._endpoint}/v2/spec/{id}",
625            headers={"Authorization": f"Bearer {self._api_key}"},
626        )
627        if not response.ok:
628            raise Exception(response.text)
629        response = response.json()
630        assert response["status"] == "ok"
631
632    def export_spec(
633        self, id: str, path: str, encoder=None, encoder_opts=None, format=None
634    ):
635        assert type(id) is str
636        assert type(path) is str
637        req = {
638            "path": path,
639            "encoder": encoder,
640            "encoder_opts": encoder_opts,
641            "format": format,
642        }
643        response = self._session.post(
644            f"{self._endpoint}/v2/spec/{id}/export",
645            json=req,
646            headers={"Authorization": f"Bearer {self._api_key}"},
647        )
648        if not response.ok:
649            raise Exception(response.text)
650        response = response.json()
651        assert response["status"] == "ok"
652
653    def push_spec_part(self, spec_id, pos, frames, terminal):
654        if type(spec_id) is Spec:
655            spec_id = spec_id._id
656        assert type(spec_id) is str
657        assert type(pos) is int
658        assert type(frames) is list
659        assert type(terminal) is bool
660
661        req_frames = []
662        for frame in frames:
663            assert type(frame) is tuple
664            assert len(frame) == 2
665            t = frame[0]
666            f = frame[1]
667            assert type(t) is Fraction
668            assert f is None or type(f) is SourceExpr or type(f) is FilterExpr
669            req_frames.append(
670                [
671                    [t.numerator, t.denominator],
672                    f._to_json_spec() if f is not None else None,
673                ]
674            )
675
676        req = {
677            "pos": pos,
678            "frames": req_frames,
679            "terminal": terminal,
680        }
681        response = self._session.post(
682            f"{self._endpoint}/v2/spec/{spec_id}/part",
683            json=req,
684            headers={"Authorization": f"Bearer {self._api_key}"},
685        )
686        if not response.ok:
687            raise Exception(response.text)
688        response = response.json()
689        assert response["status"] == "ok"
690
691    def push_spec_part_block(
692        self, spec_id: str, pos, blocks, terminal, compression="gzip"
693    ):
694        if type(spec_id) is Spec:
695            spec_id = spec_id._id
696        assert type(spec_id) is str
697        assert type(pos) is int
698        assert type(blocks) is list
699        assert type(terminal) is bool
700        assert compression is None or compression == "gzip"
701
702        req_blocks = []
703        for block in blocks:
704            assert type(block) is _FrameExpressionBlock
705            block_body = block.as_dict()
706            block_frames = len(block_body["frame_exprs"])
707            block_body = json.dumps(block_body).encode("utf-8")
708            if compression == "gzip":
709                block_body = gzip.compress(block_body, 1)
710            block_body = base64.b64encode(block_body).decode("utf-8")
711            req_blocks.append(
712                {
713                    "frames": block_frames,
714                    "compression": compression,
715                    "body": block_body,
716                }
717            )
718
719        req = {
720            "pos": pos,
721            "terminal": terminal,
722            "blocks": req_blocks,
723        }
724        response = self._session.post(
725            f"{self._endpoint}/v2/spec/{spec_id}/part_block",
726            json=req,
727            headers={"Authorization": f"Bearer {self._api_key}"},
728        )
729        if not response.ok:
730            raise Exception(response.text)
731        response = response.json()
732        assert response["status"] == "ok"
733
734    def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"):
735        assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr
736        assert compression is None or compression in ["gzip"]
737        feb = _FrameExpressionBlock()
738        feb.insert_frame(frame_expr)
739        feb_body = feb.as_dict()
740
741        feb_body = json.dumps(feb_body).encode("utf-8")
742        if compression == "gzip":
743            feb_body = gzip.compress(feb_body, 1)
744        feb_body = base64.b64encode(feb_body).decode("utf-8")
745        req = {
746            "width": width,
747            "height": height,
748            "pix_fmt": pix_fmt,
749            "compression": compression,
750            "block": {
751                "frames": 1,
752                "compression": compression,
753                "body": feb_body,
754            },
755        }
756        response = self._session.post(
757            f"{self._endpoint}/v2/frame",
758            json=req,
759            headers={"Authorization": f"Bearer {self._api_key}"},
760        )
761        if not response.ok:
762            raise Exception(response.text)
763        response_body = response.content
764        assert type(response_body) is bytes
765        if compression == "gzip":
766            response_body = gzip.decompress(response_body)
767        return response_body
768
769
770class SourceExpr:
771    def __init__(self, source, idx, is_iloc):
772        self._source = source
773        self._idx = idx
774        self._is_iloc = is_iloc
775
776    def __repr__(self):
777        if self._is_iloc:
778            return f"{self._source._name}.iloc[{self._idx}]"
779        else:
780            return f"{self._source._name}[{self._idx}]"
781
782    def _to_json_spec(self):
783        if self._is_iloc:
784            return {
785                "Source": {
786                    "video": self._source._name,
787                    "index": {"ILoc": int(self._idx)},
788                }
789            }
790        else:
791            return {
792                "Source": {
793                    "video": self._source._name,
794                    "index": {"T": [self._idx.numerator, self._idx.denominator]},
795                }
796            }
797
798    def _sources(self):
799        return set([self._source])
800
801    def _filters(self):
802        return {}
803
804
805class _SourceILoc:
806    def __init__(self, source):
807        self._source = source
808
809    def __getitem__(self, idx):
810        if type(idx) is not int:
811            raise Exception(f"Source iloc index must be an integer, got a {type(idx)}")
812        return SourceExpr(self._source, idx, True)
813
814
815def _json_arg(arg, skip_data_anot=False):
816    if type(arg) is FilterExpr or type(arg) is SourceExpr:
817        return {"Frame": arg._to_json_spec()}
818    elif type(arg) is int:
819        if skip_data_anot:
820            return {"Int": arg}
821        return {"Data": {"Int": arg}}
822    elif type(arg) is str:
823        if skip_data_anot:
824            return {"String": arg}
825        return {"Data": {"String": arg}}
826    elif type(arg) is bytes:
827        arg = list(arg)
828        if skip_data_anot:
829            return {"Bytes": arg}
830        return {"Data": {"Bytes": arg}}
831    elif type(arg) is float:
832        if skip_data_anot:
833            return {"Float": arg}
834        return {"Data": {"Float": arg}}
835    elif type(arg) is bool:
836        if skip_data_anot:
837            return {"Bool": arg}
838        return {"Data": {"Bool": arg}}
839    elif type(arg) is tuple or type(arg) is list:
840        if skip_data_anot:
841            return {"List": [_json_arg(x, True) for x in list(arg)]}
842        return {"Data": {"List": [_json_arg(x, True) for x in list(arg)]}}
843    else:
844        raise Exception(f"Unknown arg type: {type(arg)}")
845
846
847class Filter:
848    """A video filter."""
849
850    def __init__(self, func: str):
851        self._func = func
852
853    def __call__(self, *args, **kwargs):
854        return FilterExpr(self, args, kwargs)
855
856
857class FilterExpr:
858    def __init__(self, filter: Filter, args, kwargs):
859        self._filter = filter
860        self._args = args
861        self._kwargs = kwargs
862
863    def __repr__(self):
864        args = []
865        for arg in self._args:
866            val = f'"{arg}"' if type(arg) is str else str(arg)
867            args.append(str(val))
868        for k, v in self._kwargs.items():
869            val = f'"{v}"' if type(v) is str else str(v)
870            args.append(f"{k}={val}")
871        return f"{self._filter._name}({', '.join(args)})"
872
873    def _to_json_spec(self):
874        args = []
875        for arg in self._args:
876            args.append(_json_arg(arg))
877        kwargs = {}
878        for k, v in self._kwargs.items():
879            kwargs[k] = _json_arg(v)
880        return {"Filter": {"name": self._filter._name, "args": args, "kwargs": kwargs}}
881
882    def _sources(self):
883        s = set()
884        for arg in self._args:
885            if type(arg) is FilterExpr or type(arg) is SourceExpr:
886                s = s.union(arg._sources())
887        for arg in self._kwargs.values():
888            if type(arg) is FilterExpr or type(arg) is SourceExpr:
889                s = s.union(arg._sources())
890        return s
891
892    def _filters(self):
893        f = {self._filter._name: self._filter}
894        for arg in self._args:
895            if type(arg) is FilterExpr:
896                f = {**f, **arg._filters()}
897        for arg in self._kwargs.values():
898            if type(arg) is FilterExpr:
899                f = {**f, **arg._filters()}
900        return f
class Source:
385class Source:
386    def __init__(self, id: str, src):
387        self._name = id
388        self._fmt = {
389            "width": src["width"],
390            "height": src["height"],
391            "pix_fmt": src["pix_fmt"],
392        }
393        self._ts = [Fraction(x[0], x[1]) for x in src["ts"]]
394        self.iloc = _SourceILoc(self)
395
396    def id(self) -> str:
397        return self._name
398
399    def fmt(self):
400        return {**self._fmt}
401
402    def ts(self) -> list[Fraction]:
403        return self._ts.copy()
404
405    def __len__(self):
406        return len(self._ts)
407
408    def __getitem__(self, idx):
409        if type(idx) is not Fraction:
410            raise Exception("Source index must be a Fraction")
411        return SourceExpr(self, idx, False)
412
413    def __repr__(self):
414        return f"Source({self._name})"
Source(id: str, src)
386    def __init__(self, id: str, src):
387        self._name = id
388        self._fmt = {
389            "width": src["width"],
390            "height": src["height"],
391            "pix_fmt": src["pix_fmt"],
392        }
393        self._ts = [Fraction(x[0], x[1]) for x in src["ts"]]
394        self.iloc = _SourceILoc(self)
iloc
def id(self) -> str:
396    def id(self) -> str:
397        return self._name
def fmt(self):
399    def fmt(self):
400        return {**self._fmt}
def ts(self) -> list[fractions.Fraction]:
402    def ts(self) -> list[Fraction]:
403        return self._ts.copy()
class Spec:
417class Spec:
418    def __init__(self, id: str, src):
419        self._id = id
420        self._fmt = {
421            "width": src["width"],
422            "height": src["height"],
423            "pix_fmt": src["pix_fmt"],
424        }
425        self._vod_endpoint = src["vod_endpoint"]
426        parsed_url = urlparse(self._vod_endpoint)
427        self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
428
429    def id(self) -> str:
430        return self._id
431
432    def play(self, method):
433        url = f"{self._vod_endpoint}playlist.m3u8"
434        status_url = f"{self._vod_endpoint}status"
435        hls_js_url = self._hls_js_url
436        return _play(self._id, url, hls_js_url, method=method, status_url=status_url)
Spec(id: str, src)
418    def __init__(self, id: str, src):
419        self._id = id
420        self._fmt = {
421            "width": src["width"],
422            "height": src["height"],
423            "pix_fmt": src["pix_fmt"],
424        }
425        self._vod_endpoint = src["vod_endpoint"]
426        parsed_url = urlparse(self._vod_endpoint)
427        self._hls_js_url = f"{parsed_url.scheme}://{parsed_url.netloc}/hls.js"
def id(self) -> str:
429    def id(self) -> str:
430        return self._id
def play(self, method):
432    def play(self, method):
433        url = f"{self._vod_endpoint}playlist.m3u8"
434        status_url = f"{self._vod_endpoint}status"
435        hls_js_url = self._hls_js_url
436        return _play(self._id, url, hls_js_url, method=method, status_url=status_url)
class Server:
439class Server:
440    def __init__(self, endpoint: str, api_key: str):
441        if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
442            raise Exception("Endpoint must start with http:// or https://")
443        if endpoint.endswith("/"):
444            raise Exception("Endpoint must not end with /")
445        self._endpoint = endpoint
446
447        self._api_key = api_key
448        self._session = requests.Session()
449        self._session.headers.update({"Authorization": f"Bearer {self._api_key}"})
450        response = self._session.get(
451            f"{self._endpoint}/v2/auth",
452            headers={"Authorization": f"Bearer {self._api_key}"},
453        )
454        if not response.ok:
455            raise Exception(response.text)
456        response = response.json()
457        assert response["status"] == "ok"
458
459    def get_source(self, id: str) -> Source:
460        assert type(id) is str
461        response = self._session.get(
462            f"{self._endpoint}/v2/source/{id}",
463            headers={"Authorization": f"Bearer {self._api_key}"},
464        )
465        if not response.ok:
466            raise Exception(response.text)
467        response = response.json()
468        return Source(response["id"], response)
469
470    def list_sources(self) -> list[str]:
471        response = self._session.get(
472            f"{self._endpoint}/v2/source",
473            headers={"Authorization": f"Bearer {self._api_key}"},
474        )
475        if not response.ok:
476            raise Exception(response.text)
477        response = response.json()
478        return response
479
480    def delete_source(self, id: str):
481        assert type(id) is str
482        response = self._session.delete(
483            f"{self._endpoint}/v2/source/{id}",
484            headers={"Authorization": f"Bearer {self._api_key}"},
485        )
486        if not response.ok:
487            raise Exception(response.text)
488        response = response.json()
489        assert response["status"] == "ok"
490
491    def search_source(
492        self, name, stream_idx, storage_service, storage_config
493    ) -> list[str]:
494        assert type(name) is str
495        assert type(stream_idx) is int
496        assert type(storage_service) is str
497        assert type(storage_config) is dict
498        for k, v in storage_config.items():
499            assert type(k) is str
500            assert type(v) is str
501        req = {
502            "name": name,
503            "stream_idx": stream_idx,
504            "storage_service": storage_service,
505            "storage_config": storage_config,
506        }
507        response = self._session.post(
508            f"{self._endpoint}/v2/source/search",
509            json=req,
510            headers={"Authorization": f"Bearer {self._api_key}"},
511        )
512        if not response.ok:
513            raise Exception(response.text)
514        response = response.json()
515        return response
516
517    def create_source(
518        self, name, stream_idx, storage_service, storage_config
519    ) -> Source:
520        assert type(name) is str
521        assert type(stream_idx) is int
522        assert type(storage_service) is str
523        assert type(storage_config) is dict
524        for k, v in storage_config.items():
525            assert type(k) is str
526            assert type(v) is str
527        req = {
528            "name": name,
529            "stream_idx": stream_idx,
530            "storage_service": storage_service,
531            "storage_config": storage_config,
532        }
533        response = self._session.post(
534            f"{self._endpoint}/v2/source",
535            json=req,
536            headers={"Authorization": f"Bearer {self._api_key}"},
537        )
538        if not response.ok:
539            raise Exception(response.text)
540        response = response.json()
541        assert response["status"] == "ok"
542        id = response["id"]
543        return self.get_source(id)
544
545    def source(self, name, stream_idx, storage_service, storage_config) -> Source:
546        """Convenience function for accessing sources.
547
548        Tries to find a source with the given name, stream_idx, storage_service, and storage_config.
549        If no source is found, creates a new source with the given parameters.
550        """
551
552        sources = self.search_source(name, stream_idx, storage_service, storage_config)
553        if len(sources) == 0:
554            return self.create_source(name, stream_idx, storage_service, storage_config)
555        return self.get_source(sources[0])
556
557    def get_spec(self, id: str) -> Spec:
558        assert type(id) is str
559        response = self._session.get(
560            f"{self._endpoint}/v2/spec/{id}",
561            headers={"Authorization": f"Bearer {self._api_key}"},
562        )
563        if not response.ok:
564            raise Exception(response.text)
565        response = response.json()
566        return Spec(response["id"], response)
567
568    def list_specs(self) -> list[str]:
569        response = self._session.get(
570            f"{self._endpoint}/v2/spec",
571            headers={"Authorization": f"Bearer {self._api_key}"},
572        )
573        if not response.ok:
574            raise Exception(response.text)
575        response = response.json()
576        return response
577
578    def create_spec(
579        self,
580        width,
581        height,
582        pix_fmt,
583        vod_segment_length,
584        frame_rate,
585        ready_hook=None,
586        steer_hook=None,
587        ttl=None,
588    ) -> Spec:
589        assert type(width) is int
590        assert type(height) is int
591        assert type(pix_fmt) is str
592        assert type(vod_segment_length) is Fraction
593        assert type(frame_rate) is Fraction
594        assert type(ready_hook) is str or ready_hook is None
595        assert type(steer_hook) is str or steer_hook is None
596        assert ttl is None or type(ttl) is int
597
598        req = {
599            "width": width,
600            "height": height,
601            "pix_fmt": pix_fmt,
602            "vod_segment_length": [
603                vod_segment_length.numerator,
604                vod_segment_length.denominator,
605            ],
606            "frame_rate": [frame_rate.numerator, frame_rate.denominator],
607            "ready_hook": ready_hook,
608            "steer_hook": steer_hook,
609            "ttl": ttl,
610        }
611        response = self._session.post(
612            f"{self._endpoint}/v2/spec",
613            json=req,
614            headers={"Authorization": f"Bearer {self._api_key}"},
615        )
616        if not response.ok:
617            raise Exception(response.text)
618        response = response.json()
619        assert response["status"] == "ok"
620        return self.get_spec(response["id"])
621
622    def delete_spec(self, id: str):
623        assert type(id) is str
624        response = self._session.delete(
625            f"{self._endpoint}/v2/spec/{id}",
626            headers={"Authorization": f"Bearer {self._api_key}"},
627        )
628        if not response.ok:
629            raise Exception(response.text)
630        response = response.json()
631        assert response["status"] == "ok"
632
633    def export_spec(
634        self, id: str, path: str, encoder=None, encoder_opts=None, format=None
635    ):
636        assert type(id) is str
637        assert type(path) is str
638        req = {
639            "path": path,
640            "encoder": encoder,
641            "encoder_opts": encoder_opts,
642            "format": format,
643        }
644        response = self._session.post(
645            f"{self._endpoint}/v2/spec/{id}/export",
646            json=req,
647            headers={"Authorization": f"Bearer {self._api_key}"},
648        )
649        if not response.ok:
650            raise Exception(response.text)
651        response = response.json()
652        assert response["status"] == "ok"
653
654    def push_spec_part(self, spec_id, pos, frames, terminal):
655        if type(spec_id) is Spec:
656            spec_id = spec_id._id
657        assert type(spec_id) is str
658        assert type(pos) is int
659        assert type(frames) is list
660        assert type(terminal) is bool
661
662        req_frames = []
663        for frame in frames:
664            assert type(frame) is tuple
665            assert len(frame) == 2
666            t = frame[0]
667            f = frame[1]
668            assert type(t) is Fraction
669            assert f is None or type(f) is SourceExpr or type(f) is FilterExpr
670            req_frames.append(
671                [
672                    [t.numerator, t.denominator],
673                    f._to_json_spec() if f is not None else None,
674                ]
675            )
676
677        req = {
678            "pos": pos,
679            "frames": req_frames,
680            "terminal": terminal,
681        }
682        response = self._session.post(
683            f"{self._endpoint}/v2/spec/{spec_id}/part",
684            json=req,
685            headers={"Authorization": f"Bearer {self._api_key}"},
686        )
687        if not response.ok:
688            raise Exception(response.text)
689        response = response.json()
690        assert response["status"] == "ok"
691
692    def push_spec_part_block(
693        self, spec_id: str, pos, blocks, terminal, compression="gzip"
694    ):
695        if type(spec_id) is Spec:
696            spec_id = spec_id._id
697        assert type(spec_id) is str
698        assert type(pos) is int
699        assert type(blocks) is list
700        assert type(terminal) is bool
701        assert compression is None or compression == "gzip"
702
703        req_blocks = []
704        for block in blocks:
705            assert type(block) is _FrameExpressionBlock
706            block_body = block.as_dict()
707            block_frames = len(block_body["frame_exprs"])
708            block_body = json.dumps(block_body).encode("utf-8")
709            if compression == "gzip":
710                block_body = gzip.compress(block_body, 1)
711            block_body = base64.b64encode(block_body).decode("utf-8")
712            req_blocks.append(
713                {
714                    "frames": block_frames,
715                    "compression": compression,
716                    "body": block_body,
717                }
718            )
719
720        req = {
721            "pos": pos,
722            "terminal": terminal,
723            "blocks": req_blocks,
724        }
725        response = self._session.post(
726            f"{self._endpoint}/v2/spec/{spec_id}/part_block",
727            json=req,
728            headers={"Authorization": f"Bearer {self._api_key}"},
729        )
730        if not response.ok:
731            raise Exception(response.text)
732        response = response.json()
733        assert response["status"] == "ok"
734
735    def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"):
736        assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr
737        assert compression is None or compression in ["gzip"]
738        feb = _FrameExpressionBlock()
739        feb.insert_frame(frame_expr)
740        feb_body = feb.as_dict()
741
742        feb_body = json.dumps(feb_body).encode("utf-8")
743        if compression == "gzip":
744            feb_body = gzip.compress(feb_body, 1)
745        feb_body = base64.b64encode(feb_body).decode("utf-8")
746        req = {
747            "width": width,
748            "height": height,
749            "pix_fmt": pix_fmt,
750            "compression": compression,
751            "block": {
752                "frames": 1,
753                "compression": compression,
754                "body": feb_body,
755            },
756        }
757        response = self._session.post(
758            f"{self._endpoint}/v2/frame",
759            json=req,
760            headers={"Authorization": f"Bearer {self._api_key}"},
761        )
762        if not response.ok:
763            raise Exception(response.text)
764        response_body = response.content
765        assert type(response_body) is bytes
766        if compression == "gzip":
767            response_body = gzip.decompress(response_body)
768        return response_body
Server(endpoint: str, api_key: str)
440    def __init__(self, endpoint: str, api_key: str):
441        if not endpoint.startswith("http://") and not endpoint.startswith("https://"):
442            raise Exception("Endpoint must start with http:// or https://")
443        if endpoint.endswith("/"):
444            raise Exception("Endpoint must not end with /")
445        self._endpoint = endpoint
446
447        self._api_key = api_key
448        self._session = requests.Session()
449        self._session.headers.update({"Authorization": f"Bearer {self._api_key}"})
450        response = self._session.get(
451            f"{self._endpoint}/v2/auth",
452            headers={"Authorization": f"Bearer {self._api_key}"},
453        )
454        if not response.ok:
455            raise Exception(response.text)
456        response = response.json()
457        assert response["status"] == "ok"
def get_source(self, id: str) -> Source:
459    def get_source(self, id: str) -> Source:
460        assert type(id) is str
461        response = self._session.get(
462            f"{self._endpoint}/v2/source/{id}",
463            headers={"Authorization": f"Bearer {self._api_key}"},
464        )
465        if not response.ok:
466            raise Exception(response.text)
467        response = response.json()
468        return Source(response["id"], response)
def list_sources(self) -> list[str]:
470    def list_sources(self) -> list[str]:
471        response = self._session.get(
472            f"{self._endpoint}/v2/source",
473            headers={"Authorization": f"Bearer {self._api_key}"},
474        )
475        if not response.ok:
476            raise Exception(response.text)
477        response = response.json()
478        return response
def delete_source(self, id: str):
480    def delete_source(self, id: str):
481        assert type(id) is str
482        response = self._session.delete(
483            f"{self._endpoint}/v2/source/{id}",
484            headers={"Authorization": f"Bearer {self._api_key}"},
485        )
486        if not response.ok:
487            raise Exception(response.text)
488        response = response.json()
489        assert response["status"] == "ok"
def search_source(self, name, stream_idx, storage_service, storage_config) -> list[str]:
491    def search_source(
492        self, name, stream_idx, storage_service, storage_config
493    ) -> list[str]:
494        assert type(name) is str
495        assert type(stream_idx) is int
496        assert type(storage_service) is str
497        assert type(storage_config) is dict
498        for k, v in storage_config.items():
499            assert type(k) is str
500            assert type(v) is str
501        req = {
502            "name": name,
503            "stream_idx": stream_idx,
504            "storage_service": storage_service,
505            "storage_config": storage_config,
506        }
507        response = self._session.post(
508            f"{self._endpoint}/v2/source/search",
509            json=req,
510            headers={"Authorization": f"Bearer {self._api_key}"},
511        )
512        if not response.ok:
513            raise Exception(response.text)
514        response = response.json()
515        return response
def create_source( self, name, stream_idx, storage_service, storage_config) -> Source:
517    def create_source(
518        self, name, stream_idx, storage_service, storage_config
519    ) -> Source:
520        assert type(name) is str
521        assert type(stream_idx) is int
522        assert type(storage_service) is str
523        assert type(storage_config) is dict
524        for k, v in storage_config.items():
525            assert type(k) is str
526            assert type(v) is str
527        req = {
528            "name": name,
529            "stream_idx": stream_idx,
530            "storage_service": storage_service,
531            "storage_config": storage_config,
532        }
533        response = self._session.post(
534            f"{self._endpoint}/v2/source",
535            json=req,
536            headers={"Authorization": f"Bearer {self._api_key}"},
537        )
538        if not response.ok:
539            raise Exception(response.text)
540        response = response.json()
541        assert response["status"] == "ok"
542        id = response["id"]
543        return self.get_source(id)
def source( self, name, stream_idx, storage_service, storage_config) -> Source:
545    def source(self, name, stream_idx, storage_service, storage_config) -> Source:
546        """Convenience function for accessing sources.
547
548        Tries to find a source with the given name, stream_idx, storage_service, and storage_config.
549        If no source is found, creates a new source with the given parameters.
550        """
551
552        sources = self.search_source(name, stream_idx, storage_service, storage_config)
553        if len(sources) == 0:
554            return self.create_source(name, stream_idx, storage_service, storage_config)
555        return self.get_source(sources[0])

Convenience function for accessing sources.

Tries to find a source with the given name, stream_idx, storage_service, and storage_config. If no source is found, creates a new source with the given parameters.

def get_spec(self, id: str) -> Spec:
557    def get_spec(self, id: str) -> Spec:
558        assert type(id) is str
559        response = self._session.get(
560            f"{self._endpoint}/v2/spec/{id}",
561            headers={"Authorization": f"Bearer {self._api_key}"},
562        )
563        if not response.ok:
564            raise Exception(response.text)
565        response = response.json()
566        return Spec(response["id"], response)
def list_specs(self) -> list[str]:
568    def list_specs(self) -> list[str]:
569        response = self._session.get(
570            f"{self._endpoint}/v2/spec",
571            headers={"Authorization": f"Bearer {self._api_key}"},
572        )
573        if not response.ok:
574            raise Exception(response.text)
575        response = response.json()
576        return response
def create_spec( self, width, height, pix_fmt, vod_segment_length, frame_rate, ready_hook=None, steer_hook=None, ttl=None) -> Spec:
578    def create_spec(
579        self,
580        width,
581        height,
582        pix_fmt,
583        vod_segment_length,
584        frame_rate,
585        ready_hook=None,
586        steer_hook=None,
587        ttl=None,
588    ) -> Spec:
589        assert type(width) is int
590        assert type(height) is int
591        assert type(pix_fmt) is str
592        assert type(vod_segment_length) is Fraction
593        assert type(frame_rate) is Fraction
594        assert type(ready_hook) is str or ready_hook is None
595        assert type(steer_hook) is str or steer_hook is None
596        assert ttl is None or type(ttl) is int
597
598        req = {
599            "width": width,
600            "height": height,
601            "pix_fmt": pix_fmt,
602            "vod_segment_length": [
603                vod_segment_length.numerator,
604                vod_segment_length.denominator,
605            ],
606            "frame_rate": [frame_rate.numerator, frame_rate.denominator],
607            "ready_hook": ready_hook,
608            "steer_hook": steer_hook,
609            "ttl": ttl,
610        }
611        response = self._session.post(
612            f"{self._endpoint}/v2/spec",
613            json=req,
614            headers={"Authorization": f"Bearer {self._api_key}"},
615        )
616        if not response.ok:
617            raise Exception(response.text)
618        response = response.json()
619        assert response["status"] == "ok"
620        return self.get_spec(response["id"])
def delete_spec(self, id: str):
622    def delete_spec(self, id: str):
623        assert type(id) is str
624        response = self._session.delete(
625            f"{self._endpoint}/v2/spec/{id}",
626            headers={"Authorization": f"Bearer {self._api_key}"},
627        )
628        if not response.ok:
629            raise Exception(response.text)
630        response = response.json()
631        assert response["status"] == "ok"
def export_spec( self, id: str, path: str, encoder=None, encoder_opts=None, format=None):
633    def export_spec(
634        self, id: str, path: str, encoder=None, encoder_opts=None, format=None
635    ):
636        assert type(id) is str
637        assert type(path) is str
638        req = {
639            "path": path,
640            "encoder": encoder,
641            "encoder_opts": encoder_opts,
642            "format": format,
643        }
644        response = self._session.post(
645            f"{self._endpoint}/v2/spec/{id}/export",
646            json=req,
647            headers={"Authorization": f"Bearer {self._api_key}"},
648        )
649        if not response.ok:
650            raise Exception(response.text)
651        response = response.json()
652        assert response["status"] == "ok"
def push_spec_part(self, spec_id, pos, frames, terminal):
654    def push_spec_part(self, spec_id, pos, frames, terminal):
655        if type(spec_id) is Spec:
656            spec_id = spec_id._id
657        assert type(spec_id) is str
658        assert type(pos) is int
659        assert type(frames) is list
660        assert type(terminal) is bool
661
662        req_frames = []
663        for frame in frames:
664            assert type(frame) is tuple
665            assert len(frame) == 2
666            t = frame[0]
667            f = frame[1]
668            assert type(t) is Fraction
669            assert f is None or type(f) is SourceExpr or type(f) is FilterExpr
670            req_frames.append(
671                [
672                    [t.numerator, t.denominator],
673                    f._to_json_spec() if f is not None else None,
674                ]
675            )
676
677        req = {
678            "pos": pos,
679            "frames": req_frames,
680            "terminal": terminal,
681        }
682        response = self._session.post(
683            f"{self._endpoint}/v2/spec/{spec_id}/part",
684            json=req,
685            headers={"Authorization": f"Bearer {self._api_key}"},
686        )
687        if not response.ok:
688            raise Exception(response.text)
689        response = response.json()
690        assert response["status"] == "ok"
def push_spec_part_block(self, spec_id: str, pos, blocks, terminal, compression='gzip'):
692    def push_spec_part_block(
693        self, spec_id: str, pos, blocks, terminal, compression="gzip"
694    ):
695        if type(spec_id) is Spec:
696            spec_id = spec_id._id
697        assert type(spec_id) is str
698        assert type(pos) is int
699        assert type(blocks) is list
700        assert type(terminal) is bool
701        assert compression is None or compression == "gzip"
702
703        req_blocks = []
704        for block in blocks:
705            assert type(block) is _FrameExpressionBlock
706            block_body = block.as_dict()
707            block_frames = len(block_body["frame_exprs"])
708            block_body = json.dumps(block_body).encode("utf-8")
709            if compression == "gzip":
710                block_body = gzip.compress(block_body, 1)
711            block_body = base64.b64encode(block_body).decode("utf-8")
712            req_blocks.append(
713                {
714                    "frames": block_frames,
715                    "compression": compression,
716                    "body": block_body,
717                }
718            )
719
720        req = {
721            "pos": pos,
722            "terminal": terminal,
723            "blocks": req_blocks,
724        }
725        response = self._session.post(
726            f"{self._endpoint}/v2/spec/{spec_id}/part_block",
727            json=req,
728            headers={"Authorization": f"Bearer {self._api_key}"},
729        )
730        if not response.ok:
731            raise Exception(response.text)
732        response = response.json()
733        assert response["status"] == "ok"
def frame(self, width, height, pix_fmt, frame_expr, compression='gzip'):
735    def frame(self, width, height, pix_fmt, frame_expr, compression="gzip"):
736        assert type(frame_expr) is FilterExpr or type(frame_expr) is SourceExpr
737        assert compression is None or compression in ["gzip"]
738        feb = _FrameExpressionBlock()
739        feb.insert_frame(frame_expr)
740        feb_body = feb.as_dict()
741
742        feb_body = json.dumps(feb_body).encode("utf-8")
743        if compression == "gzip":
744            feb_body = gzip.compress(feb_body, 1)
745        feb_body = base64.b64encode(feb_body).decode("utf-8")
746        req = {
747            "width": width,
748            "height": height,
749            "pix_fmt": pix_fmt,
750            "compression": compression,
751            "block": {
752                "frames": 1,
753                "compression": compression,
754                "body": feb_body,
755            },
756        }
757        response = self._session.post(
758            f"{self._endpoint}/v2/frame",
759            json=req,
760            headers={"Authorization": f"Bearer {self._api_key}"},
761        )
762        if not response.ok:
763            raise Exception(response.text)
764        response_body = response.content
765        assert type(response_body) is bytes
766        if compression == "gzip":
767            response_body = gzip.decompress(response_body)
768        return response_body
class SourceExpr:
771class SourceExpr:
772    def __init__(self, source, idx, is_iloc):
773        self._source = source
774        self._idx = idx
775        self._is_iloc = is_iloc
776
777    def __repr__(self):
778        if self._is_iloc:
779            return f"{self._source._name}.iloc[{self._idx}]"
780        else:
781            return f"{self._source._name}[{self._idx}]"
782
783    def _to_json_spec(self):
784        if self._is_iloc:
785            return {
786                "Source": {
787                    "video": self._source._name,
788                    "index": {"ILoc": int(self._idx)},
789                }
790            }
791        else:
792            return {
793                "Source": {
794                    "video": self._source._name,
795                    "index": {"T": [self._idx.numerator, self._idx.denominator]},
796                }
797            }
798
799    def _sources(self):
800        return set([self._source])
801
802    def _filters(self):
803        return {}
SourceExpr(source, idx, is_iloc)
772    def __init__(self, source, idx, is_iloc):
773        self._source = source
774        self._idx = idx
775        self._is_iloc = is_iloc
class Filter:
848class Filter:
849    """A video filter."""
850
851    def __init__(self, func: str):
852        self._func = func
853
854    def __call__(self, *args, **kwargs):
855        return FilterExpr(self, args, kwargs)

A video filter.

Filter(func: str)
851    def __init__(self, func: str):
852        self._func = func
class FilterExpr:
858class FilterExpr:
859    def __init__(self, filter: Filter, args, kwargs):
860        self._filter = filter
861        self._args = args
862        self._kwargs = kwargs
863
864    def __repr__(self):
865        args = []
866        for arg in self._args:
867            val = f'"{arg}"' if type(arg) is str else str(arg)
868            args.append(str(val))
869        for k, v in self._kwargs.items():
870            val = f'"{v}"' if type(v) is str else str(v)
871            args.append(f"{k}={val}")
872        return f"{self._filter._name}({', '.join(args)})"
873
874    def _to_json_spec(self):
875        args = []
876        for arg in self._args:
877            args.append(_json_arg(arg))
878        kwargs = {}
879        for k, v in self._kwargs.items():
880            kwargs[k] = _json_arg(v)
881        return {"Filter": {"name": self._filter._name, "args": args, "kwargs": kwargs}}
882
883    def _sources(self):
884        s = set()
885        for arg in self._args:
886            if type(arg) is FilterExpr or type(arg) is SourceExpr:
887                s = s.union(arg._sources())
888        for arg in self._kwargs.values():
889            if type(arg) is FilterExpr or type(arg) is SourceExpr:
890                s = s.union(arg._sources())
891        return s
892
893    def _filters(self):
894        f = {self._filter._name: self._filter}
895        for arg in self._args:
896            if type(arg) is FilterExpr:
897                f = {**f, **arg._filters()}
898        for arg in self._kwargs.values():
899            if type(arg) is FilterExpr:
900                f = {**f, **arg._filters()}
901        return f
FilterExpr(filter: Filter, args, kwargs)
859    def __init__(self, filter: Filter, args, kwargs):
860        self._filter = filter
861        self._args = args
862        self._kwargs = kwargs