改良の詳細

RedisPubSubと関連するDAQMWコンポーネント

RedisPubSubパッケージ

RedisPubSubは、分散メッセージミドルウエアであるRedisのPUBLISHやSUBSCRIBEコマンドを利用して、いわゆるPublish/Subscribeモデルを実現させたJ-PARC/MLF用のライブラリである。 Redisはオープンソースで、インメモリ型のデータストアであり、データベースやキャッシュやメッセージブローカーとして利用されている。 RedisPubSubは、MLFにおける分散システムを念頭に、必要な機能を実現させるべく、C++言語で実装された。

DAQMWでは分散オブジェクト技術であるCORBA技術を元にオブジェクトをシリアライズしてバイナリーデータとして転送していた。 RedisPubSubでは基本的に文字列を含むバイナリーデータを転送する。 バイナリーにはシリアライズされたオブジェクトも入れることができる。 また文字列にはXML/JSONなどの構造化テキストをオブジェクトからシリアライズして入れることができる。 従って、RedisPubSubは、オブジェクト転送と遜色ない機能を実現させるポテンシャルを持つ。

RedisPubSubには、情報を発信するpublish機能と情報を受け取るsubscribe機能がある。 チャネル名(キー名)に対応した値、キー・バリューでデータが扱われる。 Publisherは特定のキーにデータを付随してRedisサーバに送ると、Redisサーバはそれを保持し、同じキー名でデータがくるたびに、キユーに貯める。 一方、Subscriberはあらかじめそのキー名をsubscribeして置くと、Redisサーバではデータが来ると、それが入ってきた順に同じデータを複数のSubscriberは読み出すことができる。 そのキー名の特定のデータは、全てのSubscriberが読み出し終わると、Redisサーバから消去される。

このように、Redisサーバは、データの送受信を仲介するメッセージブローカーとして機能する。

PublisherMlfコンポーネント

このコンポーネントは、DispatcherMlf等、前段のDAQMWコンポーネントからデータを受け取り、Redisサーバにチャネル名(キー名)を持って、そのままpublishする。 publishするデータの単位は、送られてくるデータの単位と同じである。 publishされたデータは、すでにsubscribeしているSubscriberに送られる。 生データのキー名の命名規則は下記の通り。

AAA:edb:AAABBBBBB_CC_DDD_edb
AAAは、装置名で3文字、例えばNVA
BBBBBBはラン番号で6文字の数字、例えば012345
CCはDAQ IDで2文字の数字、例えば00
DDDはModule Numberで3文字の数字、例えば000

このコンポーネントに与えるパラメータは下記の通り。

<params>
  <param pid="daqId">0</param>
  <param pid="infoServerHost">localhost</param>
  <param pid="infoServerPort">6379</param>
  <param pid="isPublishing">yes</param>
  <param pid="srcAddr">192.168.0.16</param>
  <param pid="instId">NVA</param>
  <param pid="eventByteSize">8</param>
</params>

infoServerHostは、Redisサーバのホスト名
infoServerPortは、Redisサーバの使用するポート番号

T0EventPublisherMlfコンポーネント

このコンポーネントは、T0EventLoggerMlfがT0インデックスをファイルに書く機能を持つのに対して、RedisサーバにT0インデックスをpublishする機能を持つ。 T0データの命名規則は下記の通り。生データのedbという文字をt0bに替えたものである。

AAA:t0b:AAABBBBBB_CC_DDD_t0b

このコンポーネントに与えるパラメータは下記の通り。

<params>
   <param pid="daqId">0</param>
  <param pid="infoServerHost">localhost</param>
  <param pid="infoServerPort">6379</param>
  <param pid="srcAddr">192.168.0.16</param>
  <param pid="isPublishing">yes</param>
  <param pid="instId">NVA</param>
  <param pid="eventByteSize">8</param>
</params>

infoServerHostは、Redisサーバのホスト名
infoServerPortは、Redisサーバの使用するポート番号

DaqInfoPublisherコンポーネント

このコンポーネントは、SubscriberがDaqOperatorからのBegin/Endのタイミングを取得するために作られた。 Subscriberはデータを読むためにBeginの情報やデータ処理を終えるためにEndの情報が必要になる場合がある。 このコンポーネントはBeginやEndの情報をRedisサーバに投げることで、Subscriberがそれらをsubscribeして情報を取得することができるので、そのタイミングを取得できるようになる。

チャネル名(キー名)を宣言する。下記の例では、NVA:DaqInfo:string:DaqCommandがチャネル名となる。

<param pid="keyNameDaqCommand">NVA:DaqInfo:string:DaqCommand</param>

このキー名で送られるメッセージ内容は当面beginやendなどの簡単なものである。

GathererPsdPubコンポーネント

このコンポーネントは、GathererPsdコンポーネントにFIFOオーバーフロー情報をRedisサーバに投げるコードが追加されたものである。 publishされるメッセージのフォーマットはjson形式である。 Redisサーバから取り出して処理するプロセスがPythonなら、jsonモジュールのloadsメソッドでそれらのテキストをオブジェクトに簡単に変換できる。 GathererPsdには、すでにFIFO読み出しのスレッドを走らせてFIFOオーバーフローなどの情報を定期的に読み出せる仕組みがある。

FIFOオーバーフローなどの値を常時モニターするためには1分に1回とかに情報を読み出す必要がある。 Stopwatchクラスで、そのタイミングを知り送り出すことが出来る。

MakeJsonクラスは、簡単なjsonストリングを作成する。 それを使って、下記のようなjsonストリングを作り、Redisサーバに送る。

{"time":"2017-01-20T10:11:43JST","daq_id":0,"data":[["192.168.0.16",444444,0],["192.168.0.17",33333,0],["192.168.0.18",22222,0]]}

DAQMW設定の変更

RedisPubSubの追加により、xinetdにサービスに関わる/etc/xinetd.d/bootCompsの一部の変更が必要となる。 下記のように、envパラメータを追加すること。

service bootComps
{
      |
   env             = LD_LIBRARY_PATH=/opt/mlfsoft/daqmw/lib64
}

変更後は、下記のようにxinetdの再起動が必要である。
CentOS7では、

sudo systemctl restart xinetd

CentOS6では、

sudo service xinetd restart

nGEM用GathererであるGathererPushコンポーネントの追加

このコンポーネントは、中性子モニタ用検出器であるnGEMのDAQMWコンポーネントとして使われていたNgemMlfコンポーネントの置き換えである。

置き換えの理由は、NgemMLFが「一人ぼっち」コンポーネントとして下流にデータを流さないので、PublishMlfコンポーネントを使ってオンラインデータ解析ができないためである。

nGEMはNeuNETと違って、データは一度接続されると、いわゆる「垂れ流し」でデータが送られる。 NeuNETはリクエストを受けて要求に応じてデータを流す。データフローの視点から、nGEMはpush型であり、NeuNETはpull型である。 push型の読み出しモジュールをサポートするという意味でGathererPushという名前をつけた。

日付問題への対策

改良されたコンポーネント

この改良のコードを選択できるようにするために#ifdef COMMON_DATE文を入れた。有効にするためにはファイルLoggerMlf/FileUtils.hの中の、すでに挿入してある下記のコードを確認する。

#define COMMON_DATE

無効にするためにはコメント文にする。

//#define COMMON_DATE

この選択肢を設けた理由は、今までにすでにインストールされているDAQMWでも、これらのコンポーネントを走らせることができるようにするためである。 新しいDAQMWへのアップデートが難しい場合や日付問題への対策を必要としない装置グループでも、これらのコンポーネントを利用できる。

LoggerMlf

基本的な変更箇所は、自分の計算機のdateを取得する部分を、DaqOperatorが提供するdateを取得するように変更したところである。 主にFileUtilsが改良された。ParameterClient(LoggerMlf)/ ParameterServer(DaqOperator)機能を使って取得する。

KickerMlf

以前のKickerはもともとLoggerMlfのFileUtilsを使用していたので、ほとんど変更はない。名前はLoggerMlfとの名前の整合性のためKickerMlfとした。

Logger/Kickerのアルゴリズムの改良

従来のLogger/Kickerのdaq_run()メソッドのアルゴリズムで、無限ループを作っていた問題。 以前より、「Kickerコンポーネントによって読み出されたT0カウントがIROHA2でなかなか更新されない状態があった。」理由は、これにある。 DAQMWの作法で、daq_run()の中で無限ループを作るのはご法度。永遠に抜け出れなくなり、コマンドを受け付けなくなるので。 つまり、T0カウントを読み出すためのコマンドが受け付けられにくい状態になっていた。

これを改良するために、無限ループをなくし、いつでもコマンドが受け付けられるようにした。

DAQ終了アルゴリズムの追加

DaqOperatorからのEndコマンドを受け取っていても、後段のコンポーネントは、Gathererの読み出し終了タイミングを知る確実な方法がなかった。 従って、後段のコンポーネントは読み出しのタイムアウトを繰り返し、Gathererからのデータが来ないこととするアルゴリズムを取らざるを得なかった。 しかし、データの最後を示すマークつけたデータを処理するアルゴリズムを実装することで、この問題を解決した。

GathererPushに実装したメソッドでの例を挙げると、下記の通り。

int set_header(unsigned char* header, unsigned daqId, unsigned ModNo, unsigned data_byte_size);
void set_end_mark(unsigned char* footer);

これらを利用している。

また、LoggerMlfに実装したメソッドの例を挙げると、下記の通り。

bool check_end_mark(unsigned char* footer);

これを利用している。

この改良のコードを選択できるようにするために#ifdef ENDFLAG文を入れた。有効にするためにはファイルLoggerMlf/LoggerMlf.hの中の、すでに挿入してある下記のコードを確認する。

#ifdef ENDFLAG

無効にするためにはコメント文にする。

//#ifdef ENDFLAG

有効を選択した場合でも、データの最後を示すマークを持つデータが来なかった場合、無効を選択した場合と同じように処理される。