Example
The following example illustrates the basics of writing a plugin, as well as the use of classes and methods. This plugin requests face features from findface-sf-api
and then sends a request to <FFSEC_URL>/video-detector/process
to create an event with the data obtained from findface-sf-api
.
You can find this plugin at /opt/findface-security/fr_plugin/ffsec_fr_plugin.py
. Embed it as described here and try it out.
Important
Make sure that the FFSEC_URL
variable contains the actual IP address and port of the findface-security
host.
import datetime
import logging
import aiohttp
from dateutil.tz import tzutc
from facerouter.plugin import Plugin
from ntech import sfapi_client
from ntech.asyncio_utils import wrap_futures
from ntech.asyncio_utils.noop_cookie import NoopCookieJar
from ntech.tornado_utils import asyncio_to_tornado
# change this if your ffsecurity is located on another host or listens on a non-default port
FFSEC_URL = 'http://127.0.0.1:8002'
logger = logging.getLogger(__name__)
class FFSecurityPlugin(Plugin):
def __init__(self, ctx, ffsec_url):
super().__init__(ctx)
self.ffsec_url = ffsec_url.rstrip('/')
self.session = aiohttp.ClientSession(cookie_jar=NoopCookieJar())
self.future_wrapper = asyncio_to_tornado
def deactivate(self, *args):
self.session.close()
def request_headers(self, request):
return {
"Authorization": request.headers['Authorization'],
'X-Request-ID': request.request_id,
}
@wrap_futures
async def preprocess(self, request, labels):
# somewhat hacky way to pass data between preprocess and process:
request.ffsec_reception_timestamp = datetime.datetime.now(tzutc())
headers = self.request_headers(request)
async with self.session.post(self.ffsec_url + '/video-detector/preprocess', headers=headers) as resp:
resp.raise_for_status()
resp_json = await resp.json()
logger.debug("request_id=%r preprocess: ffsecurity response: %r", request.request_id, resp_json)
plugin_wants = resp_json['plugin_wants']
request.ffsec_plugin_wants = plugin_wants
logger.info("request_id=%r preprocess: ffsecurity requested features: %r", request.request_id, plugin_wants)
return plugin_wants
@wrap_futures
async def process(self, request, photo, bbox, event_id, detection: sfapi_client.DetectFace):
headers = self.request_headers(request)
with aiohttp.MultipartWriter('form-data') as mpwriter:
part = aiohttp.payload.BytesPayload(request.params.photo)
part.set_content_disposition('form-data', name='photo', filename='photo.jpg')
mpwriter.append(part)
part = aiohttp.payload.BytesPayload(b'')
part.set_content_disposition('form-data', name='normalized', filename='norm.png')
mpwriter.append(part)
part = aiohttp.payload.JsonPayload(request.params.detectorParams)
part.set_content_disposition('form-data', name='detectorParams')
mpwriter.append(part)
part = aiohttp.payload.JsonPayload([list(bbox)])
part.set_content_disposition('form-data', name='bbox')
mpwriter.append(part)
part = aiohttp.payload.StringPayload(request.params.cam_id)
part.set_content_disposition('form-data', name='cam_id')
mpwriter.append(part)
part = aiohttp.payload.StringPayload(request.params.timestamp.isoformat())
part.set_content_disposition('form-data', name='timestamp')
mpwriter.append(part)
part = aiohttp.payload.StringPayload(request.ffsec_reception_timestamp.isoformat())
part.set_content_disposition('form-data', name='reception_timestamp')
mpwriter.append(part)
part = aiohttp.payload.JsonPayload(request.ffsec_plugin_wants)
part.set_content_disposition('form-data', name='plugin_wants')
mpwriter.append(part)
if request.params.bs_type is not None:
part = aiohttp.payload.StringPayload(request.params.bs_type)
part.set_content_disposition('form-data', name='bs_type')
mpwriter.append(part)
part = aiohttp.payload.JsonPayload({
'id': getattr(detection, 'id', None),
'features': detection.features,
'bbox': detection.bbox._asdict(),
'facen': getattr(detection, 'facen', None),
'attributes': detection.attributes,
})
part.set_content_disposition('form-data', name='detection')
mpwriter.append(part)
async with self.session.post(
self.ffsec_url + '/video-detector/process',
data=mpwriter,
headers=headers
) as resp:
await resp.read()
resp.raise_for_status()
logger.info("request_id=%r process: ffsecurity accepted event", request.request_id)
async def activate(app, ctx, plugin_name, plugin_source):
plugin = FFSecurityPlugin(ctx=ctx, ffsec_url=FFSEC_URL)
return plugin