.. _exemplary-plugins: Example ------------------------------------ The following example illustrates the basics of writing a plugin, as well as the use of classes and methods. This plugin requests face features from ``findface-sf-api`` and then sends a request to ``/video-detector/process`` to create an event with the data obtained from ``findface-sf-api``. You can find this plugin at ``/opt/findface-security/fr_plugin/ffsec_fr_plugin.py``. Embed it as described :ref:`here ` and try it out. .. important:: Make sure that the ``FFSEC_URL`` variable contains the actual IP address and port of the ``findface-security`` host. .. code:: import datetime import logging import aiohttp from dateutil.tz import tzutc from facerouter.plugin import Plugin from ntech import sfapi_client from ntech.asyncio_utils import wrap_futures from ntech.asyncio_utils.noop_cookie import NoopCookieJar from ntech.tornado_utils import asyncio_to_tornado # change this if your ffsecurity is located on another host or listens on a non-default port FFSEC_URL = 'http://127.0.0.1:8002' logger = logging.getLogger(__name__) class FFSecurityPlugin(Plugin): def __init__(self, ctx, ffsec_url): super().__init__(ctx) self.ffsec_url = ffsec_url.rstrip('/') self.session = aiohttp.ClientSession(cookie_jar=NoopCookieJar()) self.future_wrapper = asyncio_to_tornado def deactivate(self, *args): self.session.close() def request_headers(self, request): return { "Authorization": request.headers['Authorization'], 'X-Request-ID': request.request_id, } @wrap_futures async def preprocess(self, request, labels): # somewhat hacky way to pass data between preprocess and process: request.ffsec_reception_timestamp = datetime.datetime.now(tzutc()) headers = self.request_headers(request) async with self.session.post(self.ffsec_url + '/video-detector/preprocess', headers=headers) as resp: resp.raise_for_status() resp_json = await resp.json() logger.debug("request_id=%r preprocess: ffsecurity response: %r", request.request_id, resp_json) plugin_wants = resp_json['plugin_wants'] request.ffsec_plugin_wants = plugin_wants logger.info("request_id=%r preprocess: ffsecurity requested features: %r", request.request_id, plugin_wants) return plugin_wants @wrap_futures async def process(self, request, photo, bbox, event_id, detection: sfapi_client.DetectFace): headers = self.request_headers(request) with aiohttp.MultipartWriter('form-data') as mpwriter: part = aiohttp.payload.BytesPayload(request.params.photo) part.set_content_disposition('form-data', name='photo', filename='photo.jpg') mpwriter.append(part) part = aiohttp.payload.BytesPayload(b'') part.set_content_disposition('form-data', name='normalized', filename='norm.png') mpwriter.append(part) part = aiohttp.payload.JsonPayload(request.params.detectorParams) part.set_content_disposition('form-data', name='detectorParams') mpwriter.append(part) part = aiohttp.payload.JsonPayload([list(bbox)]) part.set_content_disposition('form-data', name='bbox') mpwriter.append(part) part = aiohttp.payload.StringPayload(request.params.cam_id) part.set_content_disposition('form-data', name='cam_id') mpwriter.append(part) part = aiohttp.payload.StringPayload(request.params.timestamp.isoformat()) part.set_content_disposition('form-data', name='timestamp') mpwriter.append(part) part = aiohttp.payload.StringPayload(request.ffsec_reception_timestamp.isoformat()) part.set_content_disposition('form-data', name='reception_timestamp') mpwriter.append(part) part = aiohttp.payload.JsonPayload(request.ffsec_plugin_wants) part.set_content_disposition('form-data', name='plugin_wants') mpwriter.append(part) if request.params.bs_type is not None: part = aiohttp.payload.StringPayload(request.params.bs_type) part.set_content_disposition('form-data', name='bs_type') mpwriter.append(part) part = aiohttp.payload.JsonPayload({ 'id': getattr(detection, 'id', None), 'features': detection.features, 'bbox': detection.bbox._asdict(), 'facen': getattr(detection, 'facen', None), 'attributes': detection.attributes, }) part.set_content_disposition('form-data', name='detection') mpwriter.append(part) async with self.session.post( self.ffsec_url + '/video-detector/process', data=mpwriter, headers=headers ) as resp: await resp.read() resp.raise_for_status() logger.info("request_id=%r process: ffsecurity accepted event", request.request_id) async def activate(app, ctx, plugin_name, plugin_source): plugin = FFSecurityPlugin(ctx=ctx, ffsec_url=FFSEC_URL) return plugin