Building a ClamAV-Based Microengine


ClamAV 是一个基于签名的开源引擎,它有一个守护进程来快速分析所认识的样本。 此教程会把 ClamAV 纳入分析后端,带领你一步步的建构您的第二个 PolySwarm 微引擎。

The PolySwarm marketplace will be a source of previously unseen malware.

Relying on a strictly signature-based engine as your analysis backend, particularly one whose signatures everyone can access (e.g. ClamAV) is unlikely to yield unique insight into "swarmed" artifacts and therefore unlikely to outperform other engines.

This guide should not be taken as a recommendation for how to approach the marketplace but rather an example of how to incorporate an existing analysis backend into a Microengine skeleton.

此教程会带领读者建构 microengine/clamav.py。有关已完成的部分请参阅 clamav.py

clamd 的实行和整合

Start with a fresh participant-template, give it the engine-name of "MyClamAvEngine". 你应该在你目前的工作目录中找到一个 microengine-myclamavengine - 这就是我们将要编辑和实现 ClamAV 功能的的内容。

Edit the __init__.py as we describe below:

我们从导入 clamd 模块并配置一些全局变量来开始我们的 ClamAV 分析后端

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import clamd
import logging
import os
from io import BytesIO

from polyswarmclient.abstractmicroengine import AbstractMicroengine
from polyswarmclient.abstractscanner import AbstractScanner, ScanResult

logger = logging.getLogger(__name__)  # Initialize logger

CLAMD_HOST = os.getenv('CLAMD_HOST', 'localhost')
CLAMD_PORT = int(os.getenv('CLAMD_PORT', '3310'))

如果我说我们快做完了,你会相信我吗? 让我们把 clamd 初始化并且运行,这样它就可以通过 network socket 与 clamd-daemon 进行通信。

class Scanner(AbstractScanner):
    def __init__(self):
        self.clamd = clamd.ClamdAsyncNetworkSocket(CLAMD_HOST, CLAMD_PORT, CLAMD_TIMEOUT)

我们通过向 clamd 发送一个样本内容的字节流来进行互动。

ClamAV 响应这些字节流的形式:

{'stream': ('FOUND', 'Eicar-Test-Signature')}

我们可以很容易地使用 python 的 [] 运算符来解析结果。 result[0]FOUNDresult[1] 在这种情况下是 Eicar-Test-Signature

现在,我们所需要做的就只剩下在 Scanner class 中编写 scan 方法。

    async def scan(self, guid, artifact_type, content, chain):
        result = await self.clamd.instream(BytesIO(content))
        stream_result = result.get('stream', [])
        if len(stream_result) >= 2 and stream_result[0] == 'FOUND':
            return ScanResult(bit=True, verdict=True)

        return ScanResult(bit=True, verdict=False)

如果 clamd 检测到一个恶意软件,它会将 FOUND 放入 result[0] 当中。

扫描方法返回的 ScanResult 对象的构造函数采用以下参数来表示我们的结果:

  1. bit:代表此样本为 可疑的 或是 清白的 的一个 布林值
  2. verdict:表示引擎是否要对此工件进行断言的另一个 布林 值。
  3. confidence|:为 float,表示我们对我们的断言的置信度,取值范围是从 0.0 到 1.0
  4. metadata:(可选)描述工件的string

我们将包含 ClamAV 的 metadata 的部分作为给读者的练习,或者可以查看 clamav.py :)

The Microengine class is required, but we do not need to modify it, so it is not shown here.

Python 3's Asyncio - It is important that any external calls you make during a scan do not block the event loop. We forked the clamd project to add support for python 3's asyncio. Thus, for this example to run, you need install our python-clamd project to get the clamd package until our changes are merged upstream. The command you need is:

pip install git+https://github.com/polyswarm/python-clamd.git@async#egg=clamd.


Once everything is in place, let's test our participant:

Unit Testing →

Next Steps

In the Eicar example, we showed you how to implement scan logic directly in the Scanner class. And in this ClamAV example, we showed you how to call out to an external socket to access scanning logic.

Next, we'll wrap ClamAV and Yara into a single Microengine →