Building a ClamAV-Based Engine / Arbiter
Setting the Stage
ClamAV is an open source signature-based engine with a daemon that provides quick analysis of artifacts that it recognizes. This tutorial will step you through building wrapping ClamAV as an Engine / Arbiter analysis backend.
The PolySwarm marketplace will be a source of previously unseen malware.
Relying on a strictly signature-based engine as your analysis backend, particularly one whose signatures everyone can access (e.g. ClamAV) is unlikely to yield unique insight into "swarmed" artifacts and therefore unlikely to outperform other engines.
This guide should not be taken as a recommendation for how to approach the marketplace, but rather an example of how to incorporate an existing analysis backend into an Engine / Arbiter skeleton.
This tutorial will walk the reader through building microengine/clamav.py; please refer to clamav.py
for the completed work.
The resultant Scanner
class is directly usable in Engines and Arbiters alike.
clamd
Implementation and Integration
When you created your participant, participant-template
created a Scanner
class in your participant's project_slug/package_slug/participant_name_slug.py
file*.
We're going to edit the Scanner
class in this file.
Scanner
subclasses AbstractScanner
, which is provided by polyswarm-client
.
These
_slug
variables that define your directory structure are based on your responses to thecookiecutter
prompts.
We begin our ClamAV analysis backend
by importing the clamd
module and configuring some globals.
import clamd
import logging
import os
from io import BytesIO
from polyswarmclient.abstractscanner import AbstractScanner, ScanResult, ScanMode
logger = logging.getLogger(__name__) # Initialize logger
CLAMD_HOST = os.getenv('CLAMD_HOST', 'localhost')
CLAMD_PORT = int(os.getenv('CLAMD_PORT', '3310'))
CLAMD_TIMEOUT = 30.0
Next, we get clamd
initialized and running, so it can communicate with the clamd-daemon
over a network socket.
class Scanner(AbstractScanner):
def __init__(self):
super(Scanner, self).__init__(ScanMode.ASYNC)
self.clamd = clamd.ClamdAsyncNetworkSocket(CLAMD_HOST, CLAMD_PORT, CLAMD_TIMEOUT)
We interact with clamd
by sending it a byte stream of artifact contents.
ClamAV responds to these byte streams in the form:
{'stream': ('FOUND', 'Eicar-Test-Signature')}
Now, all we need is to implement the scan method in the Scanner class.
async def scan_async(self, guid, artifact_type, content, metadata, chain):
result = await self.clamd.instream(BytesIO(content))
stream_result = result.get('stream', [])
vendor = await self.clamd.version()
metadata = Verdict().set_scanner(operating_system=self.system,
architecture=self.machine,
vendor_version=vendor)
if len(stream_result) >= 2 and stream_result[0] == 'FOUND':
metadata.set_malware_family(stream_result[1])
return ScanResult(bit=True, verdict=True, confidence=1.0, metadata=metadata.json())
metadata.set_malware_family('')
return ScanResult(bit=True, verdict=False, metadata=metadata.json())
If clamd
detects a piece of malware, it puts FOUND
in result[0]
and the malware family name in results[1]
, which we include in our Verdict's metadata
.
We forked the clamd project to add support for Python 3's asyncio.
You'll need install to include our fork of python-clamd in your Engine's / Arbiter's requirements.txt file:
git+https://github.com/polyswarm/python-clamd.git@async#egg=clamd
.
Test Your Participant
Once everything is in place, let's test our participant:
Next Steps
In the Eicar example, we showed you how to implement scan logic directly in the Scanner class. And in this ClamAV example, we showed you how to call out to an external socket to access scanning logic.
Next, we'll wrap ClamAV and Yara into a single Engine.