PolySwarmPolySwarmPolySwarmPolySwarm
Go to PolySwarm
Home

Build Your First Engine

Let's make our trivial EICAR Engine a little more interesting. How about we use the ClamAV scanning engine to scan FILE artifacts.


What is ClamAV?

ClamAV is an open source signature-based antivirus engine with a daemon that provides quick analysis of artifacts. This tutorial will step you through wrapping ClamAV as an Engine.

The PolySwarm marketplace will be a source of previously unseen malware.

Relying on a strictly signature-based engine as your analysis backend, particularly one whose signatures everyone can access (e.g. ClamAV) is unlikely to yield unique insight into artifacts and therefore unlikely to outperform other engines.

This guide should not be taken as a recommendation for how to approach the marketplace, but rather an example of how to incorporate an existing analysis backend into an Engine skeleton.

This tutorial will walk the reader through building microengine-clamav; please refer to that repo for the completed work.


Adding ClamAV to Your Engine Template

ClamAV is its own service that runs externally from our Engine. In order to scan with ClamAV, scan() has to reach out to the service and send the FILE artifacts to the service. There is an existing Python library capable of talking to ClamAV, called clamd. We will use that to complete this Engine. Let's get it installed first.

Add clamd to requirements.txt. Then install the requirements, again.

bump2version
celery~=4.4.7
clamd~=1.0.2
Flask~=1.1.2
polyswarm-artifact~=1.4.2
pytest~=5.4.2
pytest-cov~=2.10.1
pytest-mock~=3.3.1
python-json-logger~=2.0.1
requests~=2.22.0
requests-mock~=1.8.0
(psvenv) $ pip install -r requirements.txt

Import the newly installed clamd package in scan.py. The clamd library uses the class clamd.ClamdNetworkSocket to connect to the clamd service. The constructor requires host, port, and timeout parameters, so let's add CLAMD_HOST, CLAMD_PORT, and CLAMD_TIMEOUT environment variables to settings.py

BROKER = os.environ.get('CELERY_BROKER_URL')
WEBHOOK_SECRET = os.environ.get('WEBHOOK_SECRET')

MAX_BID_RULE_NAME = os.environ.get('MAX_BID_RULE_NAME', 'max_allowed_bid')
MIN_BID_RULE_NAME = os.environ.get('MIN_BID_RULE_NAME', 'min_allowed_bid')
DEFAULT_MAX_BID = os.environ.get('DEFAULT_MAX_BID', to_wei(1))
DEFAULT_MIN_BID = os.environ.get('DEFAULT_MIN_BID', to_wei(1) / 16)

CLAMD_HOST = os.getenv('CLAMD_HOST', 'localhost')
CLAMD_PORT = int(os.getenv('CLAMD_PORT', '3310'))
CLAMD_TIMEOUT = float(os.getenv('CLAMD_TIMEOUT', '30.0'))

Now that the project's settings.py has the configuration variables to connect to the ClamAV service, it's time to override scan(). For Engines that process FILE artifacts, a rule of thumb is to start with downloading the file. To download the file, add content = bounty.fetch_artifact(). For Engines that process URL artifacts, we can use bounty.artifact_uri directly to get the scan target URL.

After downloading the file, setup a connection to the ClamAV service by adding:

clamd_socket = clamd.ClamdNetworkSocket(settings.CLAMD_HOST, settings.CLAMD_PORT, settings.CLAMD_TIMEOUT).

The clamd_socket isn't an open socket at all. Instead, it opens and closes sockets as requests are made.

In order for ClamAV to scan the file, it needs the file. clamd provides a means to stream the file to the ClamAV service with instream(). The only problem is that it takes a file-like object, not bytes. An easy way to turn bytes into a file-like object is with io.BytesIO(content). Combine them and store the result by adding:

result = clamd_socket.instream(io.BytesIO(content))

Streaming a malicious file to the ClamAV service yields a result containing a dict like this example:

{'stream': ('FOUND', 'Eicar-Test-Signature')}.

If the file is malicious, the key stream will hold a Tuple with the string FOUND and the malware family. To get our result data, we want to Get the value at the key stream, like this:

stream_result = result.get('stream', []).

If stream_result has 2 values in the tuple, and that the first is FOUND, it is malicious. We can easily check for that like this:

if len(stream_result) >= 2 and stream_result[0] == 'FOUND':

Since it is malicious, we want to grab the malware family and return a ScanResult, like this:

return ScanResult(verdict=Verdict.MALICIOUS, metadata=ScanMetadata().set_malware_family(stream_result[1]))

Otherwise, it is benign, so return benign, like this:

return ScanResult(verdict=Verdict.BENIGN)

When we put it all together here is our complete scan() function:

def scan(bounty: Bounty) -> ScanResult:
    content = bounty.fetch_artifact()
    clamd_socket = clamd.ClamdNetworkSocket(settings.CLAMD_HOST, settings.CLAMD_PORT, settings.CLAMD_TIMEOUT)
    result = clamd_socket.instream(io.BytesIO(content))
    stream_result = result.get('stream', [])
    if len(stream_result) >= 2 and stream_result[0] == 'FOUND':
        return ScanResult(verdict=Verdict.MALICIOUS, metadata=ScanMetadata().set_malware_family(stream_result[1]))
    else:
        return ScanResult(verdict=Verdict.BENIGN)

Next Steps

Now we have a working Engine, let's run the unit tests.

2025 © PolySwarm Pte. Ltd.