Building a ClamAV-Based Microengine


ClamAV 是一個基於簽名的開源引擎,它有一個守護進程來快速分析所認識的樣本。 此教程會把 ClamAV 納入分析後端,帶領你一步步的建構您的第二個 PolySwarm 微引擎。

The PolySwarm marketplace will be a source of previously unseen malware.

Relying on a strictly signature-based engine as your analysis backend, particularly one whose signatures everyone can access (e.g. ClamAV) is unlikely to yield unique insight into "swarmed" artifacts and therefore unlikely to outperform other engines.

This guide should not be taken as a recommendation for how to approach the marketplace but rather an example of how to incorporate an existing analysis backend into a Microengine skeleton.

此教程會帶領讀者建構 microengine/clamav.py。有關已完成的部分請參閱 clamav.py

clamd 的實行和整合

Start with a fresh participant-template, give it the engine-name of "MyClamAvEngine". 你應該在你目前的工作目錄中找到一個 microengine-myclamavengine - 這就是我們將要編輯和實現 ClamAV 功能的的內容。

Edit the __init__.py as we describe below:

我們從導入 clamd 模塊並配置一些全局變量來開始我們的 ClamAV 分析後端

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import clamd
import logging
import os
from io import BytesIO

from polyswarmclient.abstractmicroengine import AbstractMicroengine
from polyswarmclient.abstractscanner import AbstractScanner, ScanResult

logger = logging.getLogger(__name__)  # Initialize logger

CLAMD_HOST = os.getenv('CLAMD_HOST', 'localhost')
CLAMD_PORT = int(os.getenv('CLAMD_PORT', '3310'))

如果我說我們快做完了,你會相信我嗎? 讓我們把 clamd 初始化並且運行,這樣它就可以通過 network socket 與 clamd-daemon 進行通信。

class Scanner(AbstractScanner):
    def __init__(self):
        self.clamd = clamd.ClamdAsyncNetworkSocket(CLAMD_HOST, CLAMD_PORT, CLAMD_TIMEOUT)

我們通過向 clamd 發送一個樣本內容的字節流來進行互動。

ClamAV 響應這些字節流的形式:

{'stream': ('FOUND', 'Eicar-Test-Signature')}

我們可以很容易地使用 python 的 [] 運算符來解析結果。 result[0]FOUNDresult[1] 在這種情況下是Eicar-Test-Signature

現在,我們所需要做的就只剩下在 Scanner class 中編寫 scan 方法。

    async def scan(self, guid, artifact_type, content, chain):
        result = await self.clamd.instream(BytesIO(content))
        stream_result = result.get('stream', [])
        if len(stream_result) >= 2 and stream_result[0] == 'FOUND':
            return ScanResult(bit=True, verdict=True)

        return ScanResult(bit=True, verdict=False)

如果 clamd 檢測到一個惡意軟件,它會將 FOUND 放入 result[0] 當中。

掃描方法返回的 ScanResult 對象的構造函數採用以下參數來表示我們的結果:

  1. bit:代表此樣本為 可疑的 或是 清白的 的一個 布林值
  2. verdict:表示引擎是否要對此工件進行斷言的另一個 布林 值。
  3. confidence|:為 float,表示我們對我們的斷言的置信度,取值範圍是從 0.0 到 1.0
  4. metadata: (optional) string describing the artifact

我們將包含 ClamAV 的 metadata 的部分作為給讀者的練習,或者可以查看 clamav.py :)

The Microengine class is required, but we do not need to modify it, so it is not shown here.

Python 3's Asyncio - It is important that any external calls you make during a scan do not block the event loop. We forked the clamd project to add support for python 3's asyncio. Thus, for this example to run, you need install our python-clamd project to get the clamd package until our changes are merged upstream. The command you need is:

pip install git+https://github.com/polyswarm/python-clamd.git@async#egg=clamd.


Once everything is in place, let's test our participant:

Unit Testing →

Next Steps

In the Eicar example, we showed you how to implement scan logic directly in the Scanner class. And in this ClamAV example, we showed you how to call out to an external socket to access scanning logic.

Next, we'll wrap ClamAV and Yara into a single Microengine →