Example

The following example illustrates the basics of writing a plugin, as well as the use of classes and methods. This plugin requests face features from findface-sf-api and then sends a request to <FFSEC_URL>/video-detector/process to create an event with the data obtained from findface-sf-api.

You can find this plugin at /opt/ffsecurity/fr_plugin/ffsec_fr_plugin.py. Embed it as described here and try it out.

Important

Make sure that the FFSEC_URL variable contains the actual IP address and port of the findface-security host.

import datetime
import logging
import aiohttp
from dateutil.tz import tzutc
from facerouter.plugin import Plugin
from ntech import sfapi_client
from ntech.asyncio_utils import wrap_future
from ntech.asyncio_utils.noop_cookie import NoopCookieJar
from ntech.tornado_utils import asyncio_to_tornado
# change this if your ffsecurity is located on another host or listens on a non-default port
FFSEC_URL = 'http://127.0.0.1:8002'
logger = logging.getLogger(__name__)
class FFSecurityPlugin(Plugin):
    def __init__(self, ctx, ffsec_url):
        super().__init__(ctx)
        self.ffsec_url = ffsec_url.rstrip('/')
        self.session = aiohttp.ClientSession(cookie_jar=NoopCookieJar())
        self.future_wrapper = asyncio_to_tornado
    def deactivate(self, *args):
        self.session.close()
    def request_headers(self, request):
        return {
            "Authorization": request.headers['Authorization'],
            'X-Request-ID': request.request_id,
        }
    @wrap_futures
    async def preprocess(self, request, labels):
        # somewhat hacky way to pass data between preprocess and process:
        request.ffsec_reception_timestamp = datetime.datetime.now(tzutc())
        headers = self.request_headers(request)
        async with self.session.post(self.ffsec_url + '/video-detector/preprocess', headers=headers) as resp:
            resp.raise_for_status()
            resp_json = await resp.json()
            logger.debug("request_id=%r preprocess: ffsecurity response: %r", request.request_id, resp_json)
            plugin_wants = resp_json['plugin_wants']
            request.ffsec_plugin_wants = plugin_wants
            logger.info("request_id=%r preprocess: ffsecurity requested features: %r", request.request_id, plugin_wants)
        return plugin_wants
    @wrap_futures
    async def process(self, request, photo, bbox, event_id, detection: sfapi_client.DetectFace):
        headers = self.request_headers(request)
        with aiohttp.MultipartWriter('form-data') as mpwriter:
            part = aiohttp.payload.BytesPayload(request.params.photo)
            part.set_content_disposition('form-data', name='photo', filename='photo.jpg')
            mpwriter.append(part)
            part = aiohttp.payload.BytesPayload(b'')
            part.set_content_disposition('form-data', name='face0', filename='norm.png')
            mpwriter.append(part)
            part = aiohttp.payload.JsonPayload(request.params.detectorParams)
            part.set_content_disposition('form-data', name='detectorParams')
            mpwriter.append(part)
            part = aiohttp.payload.JsonPayload([list(bbox)])
            part.set_content_disposition('form-data', name='bbox')
            mpwriter.append(part)
            part = aiohttp.payload.StringPayload(request.params.cam_id)
            part.set_content_disposition('form-data', name='cam_id')
            mpwriter.append(part)
            part = aiohttp.payload.StringPayload(request.params.timestamp.isoformat())
            part.set_content_disposition('form-data', name='timestamp')
            mpwriter.append(part)
            part = aiohttp.payload.StringPayload(request.ffsec_reception_timestamp.isoformat())
            part.set_content_disposition('form-data', name='reception_timestamp')
            mpwriter.append(part)
            part = aiohttp.payload.JsonPayload(request.ffsec_plugin_wants)
            part.set_content_disposition('form-data', name='plugin_wants')
            mpwriter.append(part)
            if request.params.bs_type is not None:
                part = aiohttp.payload.StringPayload(request.params.bs_type)
                part.set_content_disposition('form-data', name='bs_type')
                mpwriter.append(part)
            part = aiohttp.payload.JsonPayload({
                'id': getattr(detection, 'id', None),
                'features': detection.features,
                'bbox': detection.bbox._asdict(),
                'facen': getattr(detection, 'facen', None),
                'attributes': detection.attributes,
            })
            part.set_content_disposition('form-data', name='detection')
            mpwriter.append(part)
            async with self.session.post(
                    self.ffsec_url + '/video-detector/process',
                    data=mpwriter,
                    headers=headers
            ) as resp:
                await resp.read()
                resp.raise_for_status()
        logger.info("request_id=%r process: ffsecurity accepted event", request.request_id)
async def activate(app, ctx, plugin_name, plugin_source):
    plugin = FFSecurityPlugin(ctx=ctx, ffsec_url=FFSEC_URL)
    return plugin
import datetime
import logging
import aiohttp
from dateutil.tz import tzutc
from facerouter.plugin import Plugin
from ntech import sfapi_client
from ntech.asyncio_utils import wrap_futures
from ntech.asyncio_utils.noop_cookie import NoopCookieJar
from ntech.tornado_utils import asyncio_to_tornado
# change this if your ffsecurity is located on another host or listens on a non-default port
FFSEC_URL = 'http://127.0.0.1:8002'
logger = logging.getLogger(__name__)
class FFSecurityPlugin(Plugin):
    def __init__(self, ctx, ffsec_url):
        super().__init__(ctx)
        self.ffsec_url = ffsec_url.rstrip('/')
        self.session = aiohttp.ClientSession(cookie_jar=NoopCookieJar())
        self.future_wrapper = asyncio_to_tornado
    def deactivate(self, *args):
        self.session.close()
    def request_headers(self, request):
        return {
            "Authorization": request.headers['Authorization'],
            'X-Request-ID': request.request_id,
        }
    @wrap_futures
    async def preprocess(self, request, labels):
        # somewhat hacky way to pass data between preprocess and process:
        request.ffsec_reception_timestamp = datetime.datetime.now(tzutc())
        headers = self.request_headers(request)
        async with self.session.post(self.ffsec_url + '/video-detector/preprocess', headers=headers) as resp:
            resp.raise_for_status()
            resp_json = await resp.json()
            logger.debug("request_id=%r preprocess: ffsecurity response: %r", request.request_id, resp_json)
            plugin_wants = resp_json['plugin_wants']
            request.ffsec_plugin_wants = plugin_wants
            logger.info("request_id=%r preprocess: ffsecurity requested features: %r", request.request_id, plugin_wants)
        return plugin_wants
    @wrap_futures
    async def process(self, request, photo, bbox, event_id, detection: sfapi_client.DetectFace):
        headers = self.request_headers(request)
        with aiohttp.MultipartWriter('form-data') as mpwriter:
            part = aiohttp.payload.BytesPayload(request.params.photo)
            part.set_content_disposition('form-data', name='photo', filename='photo.jpg')
            mpwriter.append(part)
            part = aiohttp.payload.BytesPayload(b'')
            part.set_content_disposition('form-data', name='face0', filename='norm.png')
            mpwriter.append(part)
            part = aiohttp.payload.JsonPayload(request.params.detectorParams)
            part.set_content_disposition('form-data', name='detectorParams')
            mpwriter.append(part)
            part = aiohttp.payload.JsonPayload([list(bbox)])
            part.set_content_disposition('form-data', name='bbox')
            mpwriter.append(part)
            part = aiohttp.payload.StringPayload(request.params.cam_id)
            part.set_content_disposition('form-data', name='cam_id')
            mpwriter.append(part)
            part = aiohttp.payload.StringPayload(request.params.timestamp.isoformat())
            part.set_content_disposition('form-data', name='timestamp')
            mpwriter.append(part)
            part = aiohttp.payload.StringPayload(request.ffsec_reception_timestamp.isoformat())
            part.set_content_disposition('form-data', name='reception_timestamp')
            mpwriter.append(part)
            part = aiohttp.payload.JsonPayload(request.ffsec_plugin_wants)
            part.set_content_disposition('form-data', name='plugin_wants')
            mpwriter.append(part)
            if request.params.bs_type is not None:
                part = aiohttp.payload.StringPayload(request.params.bs_type)
                part.set_content_disposition('form-data', name='bs_type')
                mpwriter.append(part)
            part = aiohttp.payload.JsonPayload({
                'id': getattr(detection, 'id', None),
                'features': detection.features,
                'bbox': detection.bbox._asdict(),
                'facen': getattr(detection, 'facen', None),
                'attributes': detection.attributes,
            })
            part.set_content_disposition('form-data', name='detection')
            mpwriter.append(part)
            async with self.session.post(
                    self.ffsec_url + '/video-detector/process',
                    data=mpwriter,
                    headers=headers
            ) as resp:
                await resp.read()
                resp.raise_for_status()
        logger.info("request_id=%r process: ffsecurity accepted event", request.request_id)
async def activate(app, ctx, plugin_name, plugin_source):
    plugin = FFSecurityPlugin(ctx=ctx, ffsec_url=FFSEC_URL)
    return plugin