Building a ClamAV-Based Microengine


ClamAV は、オープン・ソースのシグネチャー・ベースのエンジンであり、認識したアーティファクトを迅速に分析するデーモンを備えています。 このチュートリアルでは、ClamAV を分析バックエンドとして取り込むことで、2 つ目の PolySwarm マイクロエンジンを作成するプロセスについて順に説明します。

The PolySwarm marketplace will be a source of previously unseen malware.

Relying on a strictly signature-based engine as your analysis backend, particularly one whose signatures everyone can access (e.g. ClamAV) is unlikely to yield unique insight into "swarmed" artifacts and therefore unlikely to outperform other engines.

This guide should not be taken as a recommendation for how to approach the marketplace but rather an example of how to incorporate an existing analysis backend into a Microengine skeleton.

このチュートリアルでは、microengine/clamav.py の作成について説明します。完成したソリューションについては、clamav.py をご覧ください。

clamd の実装と統合

Start with a fresh participant-template, give it the engine-name of "MyClamAvEngine". 現行作業ディレクトリーに microengine-myclamavengine があり、これを編集して ClamAV スキャンの機能を実装します。

Edit the __init__.py as we describe below:

clamd モジュールをインポートして、グローバル変数を構成することで、ClamAV 分析バックエンドを開始します。

#!/usr/bin/env python
# -*- coding: utf-8 -*-
import clamd
import logging
import os
from io import BytesIO

from polyswarmclient.abstractmicroengine import AbstractMicroengine
from polyswarmclient.abstractscanner import AbstractScanner, ScanResult

logger = logging.getLogger(__name__)  # Initialize logger

CLAMD_HOST = os.getenv('CLAMD_HOST', 'localhost')
CLAMD_PORT = int(os.getenv('CLAMD_PORT', '3310'))

これでもうすぐ完了と言ったら信じられますか? ネットワーク・ソケットを介して clamd-daemon と通信できるように、clamd を初期化して実行しましょう。

class Scanner(AbstractScanner):
    def __init__(self):
        self.clamd = clamd.ClamdAsyncNetworkSocket(CLAMD_HOST, CLAMD_PORT, CLAMD_TIMEOUT)

clamd の操作は、アーティファクト・コンテンツのバイト・ストリームを送信して行います。

ClamAV は、このバイト・ストリームに対して以下の形式で応答します。

{'stream': ('FOUND', 'Eicar-Test-Signature')}

Python の [] 演算子を使用して簡単に結果を解析できます。 result[0] は単語「FOUND」であり、この例の result[1] は「Eicar-Test-Signature」です。

これで、後は Scanner クラスに scan メソッドを実装するだけです。

    async def scan(self, guid, artifact_type, content, chain):
        result = await self.clamd.instream(BytesIO(content))
        stream_result = result.get('stream', [])
        if len(stream_result) >= 2 and stream_result[0] == 'FOUND':
            return ScanResult(bit=True, verdict=True)

        return ScanResult(bit=True, verdict=False)

clamd は、マルウェアを検出すると、result[0] に「FOUND」を入れます。

scan メソッドで返される ScanResult オブジェクトのコンストラクターは、結果を表す以下のパラメーターを取ります。

  1. bit : malicious (悪意がある) か benign (無害) かの判定を表す boolean
  2. verdict: エンジンでアーティファクトに関するアサーションを出すかどうかを表す boolean
  3. confidence: アサーションの信頼度を表す float (0.0 から 1.0)
  4. metadata: (optional) string describing the artifact

練習問題として ClamAV の metadata の組み込みを行ってください。あるいは、clamav.py を確認してください。

The Microengine class is required, but we do not need to modify it, so it is not shown here.

Python 3's Asyncio - It is important that any external calls you make during a scan do not block the event loop. We forked the clamd project to add support for python 3's asyncio. Thus, for this example to run, you need install our python-clamd project to get the clamd package until our changes are merged upstream. The command you need is:

pip install git+https://github.com/polyswarm/python-clamd.git@async#egg=clamd.


Once everything is in place, let's test our participant:

Unit Testing →

Next Steps

In the Eicar example, we showed you how to implement scan logic directly in the Scanner class. And in this ClamAV example, we showed you how to call out to an external socket to access scanning logic.

Next, we'll wrap ClamAV and Yara into a single Microengine →