Clojure, Elixirでプロセス間通信 〜TCP通信でBF,UDS通信でなんでも掲示板〜
これは,traP Advent Calender 2016 7日目の記事です。
はじめに
Double_oxygeNです。
あらゆる言語が好きな人です。
早速ですが,まずは今回の記事の主役,ClojureとElixirについて簡単に説明したいと思います。
Clojure | Elixir | |
---|---|---|
登場年 | 2007 | 2012 |
実行環境 | Java VM(JVM) | Erlang VM(BEAM) |
文法 | Lispっぽい | Rubyっぽい |
ビルドツール | leiningen | mix (leiningenを参考にしている) |
Clojure/Elixirはいいぞ。
TCP通信でBF
これから2つの制作物を紹介しますが,どちらもClojureやElixirを用いてプロセス間通信(IPC)をしてみたという点で共通しています。
このような言語を紹介する時は大抵,immutable(不変)なデータ構造や特徴的な並行処理技法をよく引き合いに出されます。
その実践例と考えていただければと思います。
まず最初の制作ですが,TCP通信を用いたBrainfインタプリタを作りました*。
はっきり言ってしまうとこの制作物自体は無駄です。
しかし,制作の過程で色々な技術を用いているので,それを主に紹介しようと思います。
設計
サーバサイドがコードを保管し,それをクライアントサイドで動かす,というような仕組みを考えます。
サーバサイドは,その耐障害性を試すためにもElixirを採用します。
一方クライアントサイドは,その機能を考えてClojureを採用しました。
違う言語同士で齟齬なく通信ができるかを試す意味でも,このモデルは適していると考えました。
サーバサイド
ターミナルで
$ mix new --sup brain
と入力することで,新規プロジェクトの雛形が完成します。
––sup オプションは,スーパーバイザ込みの雛形を作成することを意味します。
スーパーバイザとは何でしょうか?
ElixirはErlangから作られた言語ですが,そのErlangに組み込まれているOTP(Open Telecom Platform; 便利機能を使いやすくしたフレームワークのようなもの)の一つとしてスーパーバイザというものがあります。
具体的には,スーパーバイザはある別のプロセスを監視下に入れ,もし実行中に監視下のプロセスでエラーが起きたら,自動でリセットして再起動するというものです。
この機能は主に通信用に作られたErlangには重要です。
分散処理のようなシミュレーションの難しいプログラムでは,予期せぬエラーが発生してしまうのは避けようがありません。
しかし,そのためにサーバがいちいちダウンしてしまうようでは,サービスの信頼性の低下につながります。
再起動してサービスを継続させつつ,バグにはホットスワップで対応させるということができると,利用者を困らせることなくサービスの品質を向上させることができるのです。
以下,実際のコードを抜萃しながら説明していきます。
(Elixirがシンタックスハイライトに対応していなかったので,Rubyのハイライトを用いています。あしからず。)
brain/lib/ には,既にmixによって生成されたbrain.exが置いてあります。
これでもうスーパーバイザはほとんど完成していて,あとは管理下に置くワーカープロセスを登録するだけです。
brain/lib/ にworker.exを作成します。ここには,サーバー側が命令からどのような処理をするかを書いていきます。
GenServerと呼ばれるOTPを用いて,単純なメッセージパッシングによる並行プロセスを作ります。
useしてGenServerの使用を宣言します。
use GenServer
init/1 では,最初のサーバの状態を決定します。タプルの第二要素がそれです。今回は,初期化時に渡される引数start_argsをそのまま初期値とします。
(Elixirでは,関数の引数の個数を関数名の後に書くのが決まりになっています。)
def init(start_args) do
{:ok, start_args}
end
handle_call/3 は,GenServer.call/2 が呼び出された時に処理を行い,メッセージが送られてきたら,状態を変更させつつ何らかの値(ここでは,現在の状態state)を送り返します。
def handle_call(:show, _from, state) do
{:reply, state, state}
end
handle_cast/2 はhandle_call/3 に似ていますが,GenServer.cast/2 が呼び出された時に処理を行い,状態は変更させますが値は送り返しません。
def handle_cast({:push, e}, state) do
{:noreply, e}
end
これらにメッセージを送るのは,下記の部分です。
def start_link(args) do
GenServer.start_link(__MODULE__, args, name: __MODULE__)
end
def show() do
GenServer.call(__MODULE__, :show)
end
def push(e) do
GenServer.cast(__MODULE__, {:push, e})
end
GenServerの関数はそれぞれ,第一引数が送り先を,第二引数が送るメッセージを表しています。
このように,GenServerを用いることで非常に簡潔な書き方でメッセージパッシングが実現できます。
さて,続いてTCPサーバを作ります。同ディレクトリにtcp-server.exを作ります。
実際にTCPで通信する部分は,このような感じです。
def accept(port) do
{:ok, socket} = :gen_tcp.listen(port, [:binary, packet: :line, active: false, reuseaddr: true])
Logger.info "Accepting connections on port #{port}"
loop_acceptor(socket)
end
def loop_acceptor(socket) do
pid = spawn(__MODULE__, :serve, [])
{:ok, client} = :gen_tcp.accept(socket)
send pid, client
loop_acceptor(socket)
end
def serve do
receive do
socket ->
socket
|> read_line
|> prescript_to_worker
|> write_line(socket)
send self, socket
serve
end
end
def read_line(socket) do
{:ok, data} = :gen_tcp.recv(socket, 0)
data
end
def write_line(line, socket) do
:gen_tcp.send(socket, line <> <<13, 10>>)
end
5行目でTCPサーバを開いて,そのソケットのPID(Process IDentifier)を受け取っています。
GenServerは使っていませんが,ここにもメッセージパッシングが使われています。
まず11行目のspawn/3 で送り先を指定し,PIDを発行します。
send/2 で,先ほど発行したPIDの送り先に向けて値を送ります。
送られてくる相手はreceive でその値を待ち構え,値が来たらその後の処理を実行します。
ただしreceive は使い捨てなので,再び値を待つようにするには再帰をして,自身に同じ値を送るようにします(25,26行目)。
こうして,多数のクライアントを受け入れ,さらに複数のメッセージをそこから受け取り,また返す仕組みができました。
これでサーバは完成しましたが,最後にこれらサーバをスーパーバイザに監視させる処理をします。
brain.exに戻って,その中のchildrenに今作った2つを入れて,ワーカーとして登録します。
children = [
worker(Brain.Worker, ["++++++++++[>+++++++>++++++++++>+++>+<<<<-]>++.>+.+++++++..+++.>++.<<+++++++++++++++.>.+++.------.--------.>+."]),
worker(Task, [Brain.TCPServer, :accept, [4040]])
]
サーバは,ターミナルから
$ mix run --no-halt
で動き出します。
クライアントサイド
続いてClojureによるクライアントサイドです。ターミナルで
$ lein new app brainf
と入力し,新規プロジェクトを作ります。
今回はClojureのcore.asyncライブラリを用います。
このライブラリは,名前の通り非同期処理(asynchronous processing)を書けるようにした公式のライブラリです。
project.cljのdependenciesに
:dependencies [[org.clojure/clojure "1.8.0"]
[org.clojure/core.async "0.2.395"]]
と入れて,ターミナルで
$ lein deps
とすると,leiningenが自動で依存関係を解決し,(今回では)core.asyncを使えるように環境を整えてくれます。
それでは,クライアントサイドを作っていきます。
client.cljをsrc/brainf/ に作ります。
名前空間を定義し,必要なものを導入していきます。
(ns brainf.client
(:import (java.net Socket)
(java.io PrintWriter InputStreamReader BufferedReader)
(java.lang NullPointerException))
(:require [clojure.core.async :as a :refer [<! >! <!! >!! chan go go-loop]]
[clojure.string :as str]
[brainf.interpreter :as i]))
core.asyncのチャンネルは,キューとなって>!や>!!で値を溜め,<!や<!!で値を放ちます。
値が溜まっていない場合は,溜まるまで放つのを待ってくれます。
!の数は,1つならノンブロッキング処理(定員を溢れても引き止める,goの中でのみ使える),2つならブロッキング処理(定員までしか入れない,goの外でも使える)です。
チャンネルを引数に取る以下のような関数を定義することでイベントハンドラみたいに使えると思います。
(defn printer-channel [c]
(go-loop []
(print (<! c))
(flush)
(recur)))
(defn reader-channel [c socket]
(let [in (BufferedReader. (InputStreamReader. (.getInputStream socket)))]
(go-loop [line (.readLine in)]
(>! c line)
(recur (.readLine in)))))
(defn writer-channel [c socket]
(let [out (PrintWriter. (.getOutputStream socket) true)]
(go-loop []
(.println out (<! c))
(recur))))
go-loopで,チャンネルに値が積まれたらすぐに命令を実行します。
printer-channelでは画面出力を,reader-channelではソケットからの受信を,writer-channelではソケットでの送信をチャンネルに担わせます。
go以下はその他の処理とは並行に処理されるので,ループで動作が止まることはありません。
java.net.Socketを利用してソケットを繋ぎ,CUI上でコマンドを受け付けます。
(defn connect [server]
(with-open [socket (Socket. (:host server) (:port server))]
(let [printerc (chan) readerc (chan) writerc (chan)]
(printer-channel printerc)
(reader-channel readerc socket)
(writer-channel writerc socket)
(>!! printerc "server connected.\nnokan > ")
(loop [cmd (read-line)]
(let [commands (str/split cmd #"\s")]
(case (first commands)
"init" (do (>!! writerc \I) (>!! printerc (str (<!! readerc) \newline "nokan > ")) (recur (read-line)))
"show" (do (>!! writerc \S) (>!! printerc (str (<!! readerc) \newline "nokan > ")) (recur (read-line)))
"add" (do (>!! writerc (str \A (second commands))) (>!! printerc (str (<!! readerc) \newline "nokan > ")) (recur (read-line)))
"push" (do (>!! writerc (str \P (second commands))) (>!! printerc (str (<!! readerc) \newline "nokan > ")) (recur (read-line)))
"amend" (do (>!! writerc (str \M (second commands) \| (nth commands 2) \| (nth commands 3))) (>!! printerc (str (<!! readerc) \newline "nokan > ")) (recur (read-line)))
"replace" (do (>!! writerc (str \R (second commands) \| (nth commands 2))) (>!! printerc (str (<!! readerc) \newline "nokan > ")) (recur (read-line)))
"help" (do (>!! printerc (str help-message (:port server) "\n\nnokan > ")) (recur (read-line)))
"run" (do (>!! writerc \S) (run-code printerc readerc socket) (recur (read-line)))
"quit" nil
"exit" nil
(do (>!! printerc (str (first commands) " : command not found\nIf you want to see help, use \"help\" command.\nnokan > ")) (recur (read-line)))))))))
これでやりたいことはおおよそできました。
(Brainf***のインタプリタの部分はオマケなので割愛させていただきます)
あとはmain関数で呼び出すだけです。
core.cljはこのようにします。
(ns brainf.core
(:require [brainf.client :refer :all])
(:gen-class))
(defn -main
[& args]
(case (first args)
(connect localhost)))
コードができたら,起動です。
$ lein run
これで動き出すはずです。
実行可能JARファイルを作ることも可能です。
$ lein uberjar
とターミナルに入力すると,target/uberjar/ ディレクトリ内にstandaloneと名のついたJARファイルができます。
$ java -jar (実行可能JARファイル名).jar
で起動します。
完成品
(クリックでアニメーションが見られます。操作がぎこちないですがアプリケーションは問題なく動いています。)
別々のアプリで同じコードが共有されました。
また,サーバ(左上)でエラーが起きても,一瞬後には何事もなかったかのように動いています。
ソースコード全文は GitHubのページ にあります。
UDS通信でなんでも掲示板
続いて2つ目の制作ですが,UDS通信を用いた掲示板のようなものを作りました。
まずUDSですが,略さずいうとUNIX Domain Socketのことで,同じコンピュータ上でプロセス間通信を行う場合に用いられます。
詳しくは下の記事が参考になります。
調べなきゃ寝れない!と調べたら余計に寝れなくなったソケットの話 - Qiita
設計
サーバがJSON形式の文字列を受け取り,それを加工してGUI上に表示するものを作ります。
それだけです。
サーバサイド(Budgerigar)
ClojureはJavaのライブラリを使えるところが大きな強みの一つとなっています。
Mavenを調べ,junixsocketと呼ばれるJava用のUDS通信ライブラリを使うことにします。
$ lein new app budgerigar
として,プロジェクトを開始します。
project.cljに依存関係を書いて,解決させておきます。
:dependencies [[org.clojure/clojure "1.8.0"]
[com.kohlschutter.junixsocket/junixsocket-native-common "2.0.4"]
[org.clojure/data.json "0.2.6"]
[org.clojure/core.async "0.2.395"]
[com.taoensso/timbre "4.7.0"]]
GUIにはJavaのSwingを使うことにします。
gui.cljを作ります。コードは長いので重要な部分だけ抜萃していきます。
JPanelを作る関数を定義します。各種リスナーを搭載しています。
(defn gui-panel [reader]
(let [width 1024 height 768 messages (atom []) background-image (-> (Toolkit/getDefaultToolkit) (.getImage (io/resource "background.png")))]
(s/painter-channel reader messages width height)
(proxy [JPanel ActionListener MouseListener MouseMotionListener] []
(paintComponent [g]
(let [g2 (cast Graphics2D g)]
(doto g2
(.setRenderingHint RenderingHints/KEY_TEXT_ANTIALIASING RenderingHints/VALUE_TEXT_ANTIALIAS_GASP)
(.drawImage background-image 0 0 width height this))
(doall (map #(fill-rectangle-from-map g2 % width height) @messages))))
(actionPerformed [e]
(reset! messages (vec (keep fade-action @messages)))
(.repaint this))
(mouseEntered [e])
(mouseMoved [e]
(reset! messages (vec (sort-by :alpha (map #(mouse-on-message % e) @messages)))))
(mouseClicked [e]
(reset! messages (vec (remove #(= (last (filter :on-mouse @messages)) %) @messages))))
(mousePressed [e])
(mouseDragged [e])
(mouseReleased [e])
(mouseExited [e])
(getPreferredSize []
(Dimension. width height)))))
JSON文字列は,加工されてmessagesに格納されていきます。
Clojureでは,変更可能な状態をatomとして持つことができます。
これは便利な反面,副作用を許しているので扱いには注意しましょう。
このコードのようにリストの要素を増やしていくような使い方をする場合だと,肥大化してメモリを喰い荒らす恐れがありますので,必要のない要素はこまめに削除していく仕組みが必要です。
背景画像(background.png)のようなリソースは,resources/ ディレクトリに置いておくと (clojure.java.io/resource ファイル名) で取り出すことができます。
続いてウィンドウを表示させる部分ですが,このようになっています。
(defn set-gui [fps reader]
(let [frame (JFrame. "Budgerigar - my versatile bulletin")
panel (gui-panel reader)
menubar (make-menubar)
timer (Timer. (int (/ 1000 fps)) panel)
icon (-> (Toolkit/getDefaultToolkit) (.getImage (io/resource "icon.png")))]
(System/setProperty "apple.laf.useScreenMenuBar" "true")
(System/setProperty "com.apple.mrj.application.apple.menu.about.name" "Budgerigar")
(doto panel
(.setFocusable true)
(.addMouseListener panel)
(.addMouseMotionListener panel))
(doto frame
(.setDefaultCloseOperation javax.swing.WindowConstants/EXIT_ON_CLOSE)
(.setResizable false)
(.setIconImage icon)
(.add panel)
(.setJMenuBar menubar)
.pack
(.setVisible true))
(.start timer)))
System/setProperty を使ってOS Xに準拠した表示にしたかったのですが,できませんでした。
メニューバーは,後の機能拡張のために一応つけておきます。
110行目にあるTimerを設定しておくことによって,先ほどのJPanelのactionPerformedリスナーが一定時間ごとに発動し,ループができます。
最後にserver.cljを作り,UDSサーバを作ります。
実際のソースコードには文字列の加工やcore.asyncのチャンネルの設定などがありますが,サーバを作っている部分は次の8行です。
socket-file-srcで指定されたファイル(指定することで生成される)をソケットアドレスとし,ソケットを開きます。
(defn make-server [socket-file-src]
(let [server (AFUNIXServerSocket/newInstance)]
(->> socket-file-src
io/file
AFUNIXSocketAddress.
(.bind server))
(tim/info "created server")
server))
あとは,main関数に次のように書きこむだけです。
(ns budgerigar.core
(:require [clojure.core.async :as async :refer [<! >! <!! chan go-loop]]
[budgerigar.server :as s]
[budgerigar.gui :as gui])
(:gen-class))
(defn -main
[& args]
(case (first args)
(with-open [server (s/make-server s/budgie-socket-file)]
(let [gatekeeper (chan) reader (chan)]
(s/gatekeeper-channel gatekeeper server)
(go-loop [client (<! gatekeeper)] (s/reader-channel reader client) (recur (<! gatekeeper)))
(gui/set-gui 30 reader)
(loop [] (recur))))))
GUIアプリの場合は,実行可能JARファイルを作ったらそれをダブルクリックでも開くことができます。
クライアントサイド(Hyacinth Macaw)
今回,クライアントサイドにはTwitterクライアントを使用します。
JavaにはTwitter4Jという便利なTwitterAPIをラッピングしたライブラリがあるので使わせていただきます。
サーバにJSONを送るほか,CUIでTwitterクライアントとして動作できるようにします。
同様に,leiningenからアプリを作成します。
$ lein new app hyacinth-macaw
project.cljにdependenciesを追加します。
:dependencies [[org.clojure/clojure "1.8.0"]
[org.clojure/core.async "0.2.395"]
[org.clojure/data.json "0.2.6"]
[org.twitter4j/twitter4j-core "4.0.4"]
[org.twitter4j/twitter4j-stream "4.0.4"]
[com.kohlschutter.junixsocket/junixsocket-native-common "2.0.4"]]
Twitterはメインではないので,詳しくはソースコードを見てください。
正直僕が解説するよりも,ソースコードと公式ドキュメントを見比べたほうが早いと思います。
少しだけ解説すると,OAuth認証をして,出てきたPINコードをCUIに入力させることで,Twitterの情報を入手します。
認証画面へは,java.net.URIのbrowseメソッドで既定のブラウザを使って飛ばします。
ストリーミングを取得し,TLに流れてきたツイートやいいねされたツイートをJSON形式に加工してサーバに送ります。
UDS通信のクライアントサイドですが,ほとんどサーバサイドと同じです。
(def ^:private socket
(let [client (AFUNIXSocket/newInstance)]
(->> budgie-socket-file io/file AFUNIXSocketAddress. (.connect client))
client))
(defn sender-channel [c]
(let [out (-> socket .getOutputStream (PrintWriter. true))]
(go-loop [data (<! c)]
(.println out data)
(recur (<! c)))))
10行目で使うファイルは,サーバサイドのものと同じファイルにしてください。
UNIX Domain Socketでは,このファイルを拠点にして通信します。
.bind と .connect のメソッドの違いに注意してください。
サーバへの送信は,sender-channelの引数となるcore.asyncのチャンネルを通して行われます。
最後にcore.cljをこのようにして,完成です。
(ns hyacinth-macaw.core
(:require [hyacinth-macaw.twitter :as tw])
(:gen-class))
(defn -main
[& args]
(let [client (tw/make-twitter)]
(doto (:stream client) tw/add-listener .user)
(tw/command-twitter-action (:twitter client))))
完成品
Twitterをストリーミングして,ツイートは30秒間,いいねの通知は20秒間Budgerigarの画面に出続けます。
また,カーソルをメッセージの枠内に持っていくとフェードアウトがリセットされて,クリックでメッセージを非表示にすることもできます。
ちなみに,Twitterクライアント(Hyacinth Macaw)の方はツイートをBudgerigarに送る以外にも機能が追加されていて,例えばuserコマンドでこのようにユーザーのTLを見ることができたり,
あるいはClojureのコードを評価した結果をツイートすることもできます(これは意外と便利)。
ソースコードは,それぞれ以下のリンク先にあります。
Budgerigar
Hyacinth Macaw
まとめ
簡潔かつ高機能な言語は開発を手早く進めやすいという最も基本的な利点があります。
気になる人は使ってみるといいと思います。
また今回はこんな変なアプリケーションばかりでしたが,プロセス間通信はもっとずっと便利なものを作れる可能性があるので,この記事を参考にこういった手法を使ってみるような人が現れると嬉しいです。
明日は,Durunさんとthorium129さんの記事です。お楽しみに。