ELM327 なアダプタ、本日到着。python-OBD についてソースコード掘削してメモをアレしてますので、以下に控えを。
README.md
ソースコードが投入されているディレクトリに存在する README.md
を以下に引用します。どのような流れでシリアルポートとのやりとりを行っているかがイメージできます。
API
┌───────────────────────┐
│ obd.py / async.py │
└───┰───────────────────┘
┃ ▲
┃ ┃
┌───╂───────────────╂───┐ ┌─────────────────┐ ┌────────────────────┐
│ ┃ ┗━━━┿━━━━━━┥ │◀ ━━━━━━━┥ │
│ ┃ OBDCommand.py │ │ decoders.py │ (maybe) │ UnitsAndScaling.py │
│ ┃ ┏━━━┿━━━━ ▶│ ┝━━━━━━━ ▶│ │
└───╂───────────────╂───┘ └─────────────────┘ └────────────────────┘
┃ ┃
┃ ┃
┌───╂───────────────╂───┐ ┌─────────────────┐
│ ┃ ┗━━━┿━━━━━━┥ │
│ ┃ elm327.py │ │ protocol/ │
│ ┃ ┏━━━┿━━━━ ▶│ │
└───╂───────────────╂───┘ └─────────────────┘
┃ ┃
▼ ┃
┌───────────────────┸───┐
│ pyserial │
└───────────────────────┘
Serial Port
Not pictured:
commands.py
: defines the various OBD commands, and which decoder they usecodes.py
: stores tables of standardized values needed bydecoders.py
(mostly check-engine codes)OBDResponse.py
: defines structures/objects returned by the API in response to a query.
また、Github プロジェクトの README.md
にある基本的な使い方についても以下に引用します。
import obd
connection = obd.OBD() # auto-connects to USB or RF port
cmd = obd.commands.SPEED # select an OBD command (sensor)
response = connection.query(cmd) # send the command, and parse the response
print(response.value) # returns unit-bearing values thanks to Pint
print(response.value.to("mph")) # user-friendly unit conversions
また、上記コードにある、resonse という変数が保持している ODBResponse 型のオブジェクトは value という属性を保持していることが分かりますが、この属性のデータ型は Pint の Quantity というデータ型となっています。詳細については Pint Values というドキュメントを確認願います。
以降については、上記サンプルの流れに沿ってソースコードを確認して行く形で実装の確認を行っていく形で情報を記載していくこととします。
- obd.OBD() の呼び出しによる USB ポートとの接続
- OBDCommand オブジェクトを使ったコマンドの生成について
- obd#query の処理について
USB ポートとの接続
基本的な使い方、にある以下の部分がエントリポイントです。
connection = obd.OBD() # auto-connects to USB or RF port
OBD というクラスのコンストラクタは以下です。
def __init__(self, portstr=None, baudrate=None, protocol=None, fast=True):
self.interface = None
self.supported_commands = set(commands.base_commands())
self.fast = fast # global switch for disabling optimizations
self.__last_command = b"" # used for running the previous command with a CR
self.__frame_counts = {} # keeps track of the number of return frames for each command
logger.info("======================= python-OBD (v%s) =======================" % __version__)
self.__connect(portstr, baudrate, protocol) # initialize by connecting and loading sensors
self.__load_commands() # try to load the car's supported commands
logger.info("===================================================================")
supported_commands 属性に設定されるのは commands.py
にて list of commands that should always be supported by the ELM327
として base_commands
から返却される以下のコマンドとなっています。
self.PIDS_A,
self.MIDS_A,
self.GET_DTC,
self.CLEAR_DTC,
self.GET_CURRENT_DTC,
self.ELM_VERSION,
self.ELM_VOLTAGE,
それぞれの内容について一応確認しておきます。ODBCommand クラスのコンストラクタですが、以下の順で値を渡す形となっています。
- name : human readable name (also used as key in commands dict)
- desc : human readable description
- command : command string
- bytes : number of bytes expected in return
- decode : decoding function
- ecu : ECU ID from which this command expects messages from
fast : can an extra digit be added to the end of the command?
PIDS_A
OBDCommand("PIDS_A", "Supported PIDs [01-20]", b"0100", 6, pid, ECU.ENGINE, True),
- MIDS_A
OBDCommand("MIDS_A", "Supported MIDs [01-20]", b"0600", 0, pid, ECU.ALL, False),
- GET_DTC
OBDCommand("GET_DTC", "Get DTCs", b"03", 0, dtc, ECU.ALL, False),
- CLEAR_DTC
OBDCommand("CLEAR_DTC", "Clear DTCs and Freeze data", b"04", 0, drop, ECU.ALL, False),
- GET_CURRENT_DTC
OBDCommand("GET_CURRENT_DTC", "Get DTCs from the current/last driving cycle", b"07", 0, ECU.ALL, false),
- ELM_VERSION
OBDCommand("ELM_VERSION", "ELM327 version string", b"ATI", 0, raw_string, ECU.UNKNOWN, False),
- ELM_VOLTAGE
OBDCommand("ELM_VOLTAGE", "Voltage detected by OBD-II adapter", b"ATRV", 0, elm_voltage, ECU.UNKNOWN, False),
mode6 が含まれていないのは、これが CAN のみのため、ということがあるかもしれません(根拠は不明です)。これらを元にして使用可能なコマンドを列挙していく形となります。
属性の初期化を行った後、以下の手続きを呼び出しています。
self.__connect(portstr, baudrate, protocol) # initialize by connecting and loading sensors
self.__load_commands() # try to load the car's supported commands
順に確認していきます。
__connect
手続き
名前の通り、シリアルポートとの接続を行います。引数の portstr が無指定の場合、シリアルポートの探索を行い、それをもとにして接続を行います。処理の定義は以下となっています。
def __connect(self, portstr, baudrate, protocol):
"""
Attempts to instantiate an ELM327 connection object.
"""
if portstr is None:
logger.info("Using scan_serial to select port")
portnames = scan_serial()
logger.info("Available ports: " + str(portnames))
if not portnames:
logger.warning("No OBD-II adapters found")
return
for port in portnames:
logger.info("Attempting to use port: " + str(port))
self.interface = ELM327(port, baudrate, protocol)
if self.interface.status() >= OBDStatus.ELM_CONNECTED:
break # success! stop searching for serial
else:
logger.info("Explicit port defined")
self.interface = ELM327(portstr, baudrate, protocol)
# if the connection failed, close it
if self.interface.status() == OBDStatus.NOT_CONNECTED:
# the ELM327 class will report its own errors
self.close()
scan_serial 手続きは utils.py
にて定義されています。プラットフォーム別でデバイス名を列挙して開くことができるものを列挙する形となっています(実装の引用は略します)。シリアルポートとのやりとりについては先に引用した図の通り、ELM327 クラスを用います。ELM327 クラスのコンストラクタの実装ですが以下に列挙する内容となっています(長いので引用は略します)。
- 属性の初期化
- serial クラスを用いてポートを open します
次に以下の AT コマンドを発行し、異常があれば終了します。
- ATZ (reset)
- ATE0 (echo off)
- ATH1 (headers ON)
- ATL0 (linefeeds OFF)
この時点で ELM への接続は成功していますので、__status
属性に OBDStatus.ELM_CONNECTED
を設定します。ただし、この時点では車両との接続は完了していませんので、車両との接続のために set_protocol
を呼び出します。この手続が True
を戻した場合、__status
属性に OBDStatus.CAR_CONNECTED
を設定し、接続処理は終了となります。
set_protocol
手続き
引数で渡される protocol
が None であれば自動設定を行う auto_protocol
を呼び出し、そうでなければ manual_protocol
を呼び出して __protocol
属性の設定を行います。設定されるオブジェクトは _SUPPORTED_PROTOCOLS
属性として定義されている辞書に設定されている以下のいずれかのオブジェクトが設定されます。また、コンストラクタの引数には ”0100” コマンドの結果が渡されます。
_SUPPORTED_PROTOCOLS = {
#"0" : None, # Automatic Mode. This isn't an actual protocol. If the
# ELM reports this, then we don't have enough
# information. see auto_protocol()
"1" : SAE_J1850_PWM,
"2" : SAE_J1850_VPW,
"3" : ISO_9141_2,
"4" : ISO_14230_4_5baud,
"5" : ISO_14230_4_fast,
"6" : ISO_15765_4_11bit_500k,
"7" : ISO_15765_4_29bit_500k,
"8" : ISO_15765_4_11bit_250k,
"9" : ISO_15765_4_29bit_250k,
"A" : SAE_J1939,
#"B" : None, # user defined 1
#"C" : None, # user defined 2
}
# used as a fallback, when ATSP0 doesn't cut it
_TRY_PROTOCOL_ORDER = [
"6", # ISO_15765_4_11bit_500k
"8", # ISO_15765_4_11bit_250k
"1", # SAE_J1850_PWM
"7", # ISO_15765_4_29bit_500k
"9", # ISO_15765_4_29bit_250k
"2", # SAE_J1850_VPW
"3", # ISO_9141_2
"4", # ISO_14230_4_5baud
"5", # ISO_14230_4_fast
"A", # SAE_J1939
]
manual_protocol
の定義を以下に引用します。
def manual_protocol(self, protocol):
r = self.__send(b"ATTP" + protocol.encode())
r0100 = self.__send(b"0100")
if not self.__has_message(r0100, "UNABLE TO CONNECT"):
# success, found the protocol
self.__protocol = self._SUPPORTED_PROTOCOLS[protocol](r0100)
return True
return False
__load_commands
手続き
この手続で有効な PID を OBDCommand#pid_getters
手続きにより検索したものを列挙して supported_commands に追加します。以下の順で処理を行います。
- supported command を列挙してその要素について処理を行う
- supported command または CAN のみに実装されたコマンドの場合はスルー
- コマンドを実行してレスポンスを取得、レスポンスが null の場合 invalid なコマンドとみなす
- valid なコマンドの場合、戻された value を順に取り出して以下の処理を行う - mode、pid を取得して Command#has_pid が True の場合、supported_commands に追加
- mode が 1 の場合、mode 2 への追加判定を行い、必要であれば追加
手続き定義を以下に引用します。また、OBD.query
の詳細については別の説で詳細に確認します。
def __load_commands(self):
"""
Queries for available PIDs, sets their support status,
and compiles a list of command objects.
"""
if self.status() != OBDStatus.CAR_CONNECTED:
logger.warning("Cannot load commands: No connection to car")
return
logger.info("querying for supported commands")
pid_getters = commands.pid_getters()
for get in pid_getters:
# PID listing commands should sequentialy become supported
# Mode 1 PID 0 is assumed to always be supported
if not self.test_cmd(get, warn=False):
continue
# when querying, only use the blocking OBD.query()
# prevents problems when query is redefined in a subclass (like Async)
response = OBD.query(self, get)
if response.is_null():
logger.info("No valid data for PID listing command: %s" % get)
continue
# loop through PIDs bitarray
for i, bit in enumerate(response.value):
if bit:
mode = get.mode
pid = get.pid + i + 1
if commands.has_pid(mode, pid):
self.supported_commands.add(commands[mode][pid])
# set support for mode 2 commands
if mode == 1 and commands.has_pid(2, pid):
self.supported_commands.add(commands[2][pid])
logger.info("finished querying with %d commands supported" % len(self.supported_commands))
commands.pid_getters から戻されるのは OBDCommand オブジェクトの配列です。また、OBDCommand については以下なメソッドが定義されています。
@property
def mode(self):
if len(self.command) >= 2 and \
isHex(self.command.decode()):
return int(self.command[:2], 16)
else:
return None
@property
def pid(self):
if len(self.command) > 2 and \
isHex(self.command.decode()):
return int(self.command[2:], 16)
else:
return None
また、commands.pid_getters の定義は以下の通りとなっており
def pid_getters(self):
""" returns a list of PID GET commands """
getters = []
for mode in self.modes:
getters += [ cmd for cmd in mode if (cmd and cmd.decode == pid) ]
return getters
以下の ODBCommand オブジェクトを要素として持つ配列が戻されるものと思われます (mode2 は省略)。
- PIDS_A
OBDCommand("PIDS_A", "Supported PIDs [01-20]", b"0100", 6, pid, ECU.ENGINE, True),
- PIDS_B
OBDCommand("PIDS_B", "Supported PIDs [21-40]", b"0120", 6, pid, ECU.ENGINE, True),
- PIDS_C
OBDCommand("PIDS_C", "Supported PIDs [41-60]", b"0140", 6, pid, ECU.ENGINE, True),
- MIDS_A
OBDCommand("MIDS_A", "Supported MIDs [01-20]", b"0600", 0, pid, ECU.ALL, False),
- MIDS_B
OBDCommand("MIDS_B", "Supported MIDs [21-40]", b"0620", 0, pid, ECU.ALL, False),
- MIDS_C
OBDCommand("MIDS_C", "Supported MIDs [41-60]", b"0640", 0, pid, ECU.ALL, False),
- MIDS_D
OBDCommand("MIDS_D", "Supported MIDs [61-80]", b"0660", 0, pid, ECU.ALL, False),
- MIDS_E
OBDCommand("MIDS_E", "Supported MIDs [81-A0]", b"0680", 0, pid, ECU.ALL, False),
- MIDS_F
OBDCommand("MIDS_F", "Supported MIDs [A1-C0]", b"06A0", 0, pid, ECU.ALL, False),
OBD クラスのコンストラクタの処理の説明については以上となります。
コマンドの生成
基本的な使い方にある以下の部分が使用例です。
cmd = obd.commands.SPEED # select an OBD command (sensor)
Commands クラスは以下の mode について ODBCommand オブジェクトの配列を modes という属性として保持しています (mode5、mode8 の追加は可能と考えていますが未確認です)。
- mode1
- mode2
- mode3
- mode4
- mode6
- mode7
- mode9
また、この配列はオブジェクトの参照から辞書としてアクセス可能な形にもなっています。これらは Commands クラスのコンストラクタにて設定されます。また、__getitem__
という手続きが定義されており、この手続定義のコメントにもある通り、以下な形でのアクセスが可能となっています。
obd.commands.RPM
obd.commands["RPM"]
obd.commands[1][12] # mode 1, PID 12 (RPM)
ODBCommand クラスは以下の属性を保持しています(コンストラクタから引用)。
self.name = name # human readable name (also used as key in commands dict)
self.desc = desc # human readable description
self.command = command # command string
self.bytes = _bytes # number of bytes expected in return
self.decode = decoder # decoding function
self.ecu = ecu # ECU ID from which this command expects messages from
self.fast = fast # can an extra digit be added to the end of the command? (to make the ELM return early)
この他に以下の手続きの形で定義されている属性もあります。
- mode
- pid
decode 属性には手続きオブジェクトが格納されます。基本的には decoders.py
で定義されている手続きが設定されます。これらはコマンドのレスポンスを取り扱う際に使用されます。例として回転数を取得するコマンドは以下の OBDCommand オブジェクトになるのですが
OBDCommand("RPM", "Engine RPM", b"010C", 4, uas(0x07), ECU.ENGINE, True),
decode 属性は uas(0x07)
となっています。定義一切を以下に引用します。
"""
Some decoders are simple and are already implemented in the Units And Scaling
tables (used mainly for Mode 06). The uas() decoder is a wrapper for any
Unit/Scaling in that table, simply to avoid redundant code.
"""
def uas(id):
""" get the corresponding decoder for this UAS ID """
return functools.partial(decode_uas, id=id)
def decode_uas(messages, id):
d = messages[0].data[2:] # chop off mode and PID bytes
return UAS_IDS[id](d)
UAS_IDS
は UnitsAndScaling.py
にて定義されています。キーが 0x07 の定義は以下です。
0x07 : UAS(False, 0.25, Unit.rpm),
クラス定義一切を以下に引用します。
class UAS():
"""
Class for representing a Unit and Scale conversion
Used in the decoding of Mode 06 monitor responses
"""
def __init__(self, signed, scale, unit, offset=0):
self.signed = signed
self.scale = scale
self.unit = unit
self.offset = offset
def __call__(self, _bytes):
value = bytes_to_int(_bytes)
if self.signed:
value = twos_comp(value, len(_bytes) * 8)
value *= self.scale
value += self.offset
return Unit.Quantity(value, self.unit)
回転数の場合、与えられた byte の並びを int に変換して 1⁄4 することが分かります。こうした実装を盛り込むことで
print(response.value.to("mph")) # user-friendly unit conversions
という操作で value の 1⁄4 の値が表示される、ということを可能にしています。OBDCommand クラスのオブジェクトはこのように受信後のデータをどのように表現するか、という手続きも属性として保持しています。
Pint のドキュメントは以下です。
コマンドの送信とレスポンスのパースについて
基本的な使い方、にある以下の部分がエントリポイントです。
response = connection.query(cmd) # send the command, and parse the response
ここでは OBD.query()
が何をしているのか、を具体的に確認していきます。ざっくりした流れとしては以下となります(エラーチェックなどは除いています)。
__build_command_string
によりコマンド文字列を取得ELM327#send_and_parse
にコマンド文字列を渡し、コマンド送信及び戻されたメッセージのパースを行いますOBDCommand#__call__
により戻されたメッセージから ODBResponse オブジェクトを生成し、それを戻します
ODBRespose オブジェクト生成時に上記 decode の処理も呼び出されます。まず、ODB#query の実装を以下に引用します。
def query(self, cmd, force=False):
"""
primary API function. Sends commands to the car, and
protects against sending unsupported commands.
"""
if self.status() == OBDStatus.NOT_CONNECTED:
logger.warning("Query failed, no connection available")
return OBDResponse()
# if the user forces, skip all checks
if not force and not self.test_cmd(cmd):
return OBDResponse()
# send command and retrieve message
logger.info("Sending command: %s" % str(cmd))
cmd_string = self.__build_command_string(cmd) # 1.
messages = self.interface.send_and_parse(cmd_string) # 2.
# if we're sending a new command, note it
# first check that the current command WASN'T sent as an empty CR
# (CR is added by the ELM327 class)
if cmd_string:
self.__last_command = cmd_string
# if we don't already know how many frames this command returns,
# log it, so we can specify it next time
if cmd not in self.__frame_counts:
self.__frame_counts[cmd] = sum([len(m.frames) for m in messages])
if not messages:
logger.info("No valid OBD Messages returned")
return OBDResponse()
return cmd(messages) # compute a response object # 3.
コメント追加していますが、末端に番号が付いている部分が箇条書きにて列挙した部分となります。以降にてそれぞれの手続きの実装を確認します。
OBD#__build_command_string
この手続きでは引数の ODBCommand オブジェクトの command 属性を戻します。ただし、以下の整形を行う場合があります。
- コマンドが戻すフレーム数がわかっているのであればその数だけ正確に待つよう設定します。ELM からのタイムアウトを回避し、クエリを高速化することができます
- 前回 CR を送信したのであれば同様にする (CR は ELM327 クラスで追加される)
以下に実装を引用します。
def __build_command_string(self, cmd):
""" assembles the appropriate command string """
cmd_string = cmd.command
# if we know the number of frames that this command returns,
# only wait for exactly that number. This avoids some harsh
# timeouts from the ELM, thus speeding up queries.
if self.fast and cmd.fast and (cmd in self.__frame_counts):
cmd_string += str(self.__frame_counts[cmd]).encode()
# if we sent this last time, just send a CR
# (CR is added by the ELM327 class)
if self.fast and (cmd_string == self.__last_command):
cmd_string = b""
return cmd_string
ELM327#send_and_parse
以下の処理を行う形となっています。
- コマンド送信処理を呼び出します
set_protocol
で設定された Protocol クラスの__call__
を呼び出します- 戻されたメッセージを返却します
実装を以下に引用します。
def send_and_parse(self, cmd):
"""
send() function used to service all OBDCommands
Sends the given command string, and parses the
response lines with the protocol object.
An empty command string will re-trigger the previous command
Returns a list of Message objects
"""
if self.__status == OBDStatus.NOT_CONNECTED:
logger.info("cannot send_and_parse() when unconnected")
return None
lines = self.__send(cmd)
messages = self.__protocol(lines)
return messages
ここで呼び出される __send
は何をするか、というと
- シリアルポートへのメッセージ送信
- delay が設定しているのであれば待機
- シリアルポートからメッセージ受信
となっています。実装は以下です。また、__write
及び __read
手続きの実装の引用は略します。
def __send(self, cmd, delay=None):
"""
unprotected send() function
will __write() the given string, no questions asked.
returns result of __read() (a list of line strings)
after an optional delay.
"""
self.__write(cmd)
if delay is not None:
logger.debug("wait: %d seconds" % delay)
time.sleep(delay)
return self.__read()
受信されたメッセージは Protocol#__call__
に渡されます。実装コメントによれば以下な手順で処理が行われます。
- preprocess
- handle valid OBD lines
- handle invalid lines (probably from the ELM)
まず、preprocess の部分では OBD 以外の行(「NO DATA」、「CAN ERROR」、「UNABLE TO CONNECT」などのメッセージ)と OBD の行を切り分けます。以降では OBD の行を valid、それ以外を invalid として処理を行います。valid な OBD の行については以下の手順で処理を行っています。
- 行ごとに取り出して Frame オブジェクトに変換し、
parse_frame
を呼び出します。これはサブクラスで実装されるメソッドです。この手続は重要な処理を行っているため、別途内容について確認します - Frame オブジェクトを tx_id 属性によりグルーピングします
- グループ単位で取り出して Message オブジェクトに変換し、
parse_message
を呼び出します。True を戻す場合、返却対象の配列に追加します
最後に messages という配列を返却して処理終了となります。手続きの実装について以下に引用します。
def __call__(self, lines):
"""
Main function
accepts a list of raw strings from the car, split by lines
"""
# ---------------------------- preprocess ----------------------------
# Non-hex (non-OBD) lines shouldn't go through the big parsers,
# since they are typically messages such as: "NO DATA", "CAN ERROR",
# "UNABLE TO CONNECT", etc, so sort them into these two lists:
obd_lines = []
non_obd_lines = []
for line in lines:
line_no_spaces = line.replace(' ', '')
if isHex(line_no_spaces):
obd_lines.append(line_no_spaces)
else:
non_obd_lines.append(line) # pass the original, un-scrubbed line
# ---------------------- handle valid OBD lines ----------------------
# parse each frame (each line)
frames = []
for line in obd_lines:
frame = Frame(line)
# subclass function to parse the lines into Frames
# drop frames that couldn't be parsed
if self.parse_frame(frame):
frames.append(frame)
# group frames by transmitting ECU
# frames_by_ECU[tx_id] = [Frame, Frame]
frames_by_ECU = {}
for frame in frames:
if frame.tx_id not in frames_by_ECU:
frames_by_ECU[frame.tx_id] = [frame]
else:
frames_by_ECU[frame.tx_id].append(frame)
# parse frames into whole messages
messages = []
for ecu in frames_by_ECU:
# new message object with a copy of the raw data
# and frames addressed for this ecu
message = Message(frames_by_ECU[ecu])
# subclass function to assemble frames into Messages
if self.parse_message(message):
# mark with the appropriate ECU ID
message.ecu = self.ecu_map.get(ecu, ECU.UNKNOWN)
messages.append(message)
# ----------- handle invalid lines (probably from the ELM) -----------
for line in non_obd_lines:
# give each line its own message object
# messages are ECU.UNKNOWN by default
messages.append( Message([ Frame(line) ]) )
return messages
以降では上記手続きにて呼び出している parse_frame
および parse_message
について確認します。定義元は protocol_can.py
とします。また、手続きの規模もそれなりなので節を分けて記載します。
parse_frame
この手続では引数で受け取った Frame オブジェクトについてデータの整形を行い、必要な属性にデータを設定します。また、データに不備がある場合には False を戻し、そうでない場合には True を戻します。手順としては以下となります。
- raw 属性よりデータを取り出して初期整形 (11-bit CAN の場合、先頭に 5-byte の 0 を追加)
- 取り出した raw の length 確認し、奇数ならエラーとして扱う
- 取り出した raw を byte 配列に変換し、length の確認 (6 以上、12 以下)
- header 情報 (priority、addr_mode、rx_id、tx_id 属性) の取得
- data 属性の取り出し
- PCI byte を取り出して type 属性に格納して値の確認し、data_len または seq_index 属性に値を設定
- ここまでで問題なければ True を戻して終了
以下に手続きの定義を引用します。コメントに詳細な情報が記載されているので確認することで受信したデータをどのように使っているかが分かります。
def parse_frame(self, frame):
raw = frame.raw
# pad 11-bit CAN headers out to 32 bits for consistency,
# since ELM already does this for 29-bit CAN headers
# 7 E8 06 41 00 BE 7F B8 13
# to:
# 00 00 07 E8 06 41 00 BE 7F B8 13
if self.id_bits == 11:
raw = "00000" + raw
# Handle odd size frames and drop
if len(raw) & 1:
logger.debug("Dropping frame for being odd")
return False
raw_bytes = bytearray(unhexlify(raw))
# check for valid size
if len(raw_bytes) < 6:
# make sure that we have at least a PCI byte, and one following byte
# for FF frames with 12-bit length codes, or 1 byte of data
#
# 00 00 07 E8 10 20 ...
logger.debug("Dropped frame for being too short")
return False
if len(raw_bytes) > 12:
logger.debug("Dropped frame for being too long")
return False
# read header information
if self.id_bits == 11:
# Ex.
# [ ]
# 00 00 07 E8 06 41 00 BE 7F B8 13
frame.priority = raw_bytes[2] & 0x0F # always 7
frame.addr_mode = raw_bytes[3] & 0xF0 # 0xD0 = functional, 0xE0 = physical
if frame.addr_mode == 0xD0:
#untested("11-bit functional request from tester")
frame.rx_id = raw_bytes[3] & 0x0F # usually (always?) 0x0F for broadcast
frame.tx_id = 0xF1 # made-up to mimic all other protocols
elif raw_bytes[3] & 0x08:
frame.rx_id = 0xF1 # made-up to mimic all other protocols
frame.tx_id = raw_bytes[3] & 0x07
else:
#untested("11-bit message header from tester (functional or physical)")
frame.tx_id = 0xF1 # made-up to mimic all other protocols
frame.rx_id = raw_bytes[3] & 0x07
else: # self.id_bits == 29:
frame.priority = raw_bytes[0] # usually (always?) 0x18
frame.addr_mode = raw_bytes[1] # DB = functional, DA = physical
frame.rx_id = raw_bytes[2] # 0x33 = broadcast (functional)
frame.tx_id = raw_bytes[3] # 0xF1 = tester ID
# extract the frame data
# [ Frame ]
# 00 00 07 E8 06 41 00 BE 7F B8 13
frame.data = raw_bytes[4:]
# read PCI byte (always first byte in the data section)
# v
# 00 00 07 E8 06 41 00 BE 7F B8 13
frame.type = frame.data[0] & 0xF0
if frame.type not in [self.FRAME_TYPE_SF,
self.FRAME_TYPE_FF,
self.FRAME_TYPE_CF]:
logger.debug("Dropping frame carrying unknown PCI frame type")
return False
if frame.type == self.FRAME_TYPE_SF:
# single frames have 4 bit length codes
# v
# 00 00 07 E8 06 41 00 BE 7F B8 13
frame.data_len = frame.data[0] & 0x0F
# drop frames with no data
if frame.data_len == 0:
return False
elif frame.type == self.FRAME_TYPE_FF:
# First frames have 12 bit length codes
# v vv
# 00 00 07 E8 10 20 49 04 00 01 02 03
frame.data_len = (frame.data[0] & 0x0F) << 8
frame.data_len += frame.data[1]
# drop frames with no data
if frame.data_len == 0:
return False
elif frame.type == self.FRAME_TYPE_CF:
# Consecutive frames have 4 bit sequence indices
# v
# 00 00 07 E8 21 04 05 06 07 08 09 0A
frame.seq_index = frame.data[0] & 0x0F
return True
parse_message
この手続では引数で受け取った Message オブジェクトについてデータの整形を行い、必要な属性にデータを設定します (data 属性にはデータ長は除かれた本来のデータが格納されます)。また、データに不備がある場合には False を戻し、そうでない場合には True を戻します。
また、この手続では複数の Frame にまたがるメッセージを一つにする処理も行っています。以下に単一 Frame の場合の処理のみを引用します。
def parse_message(self, message):
frames = message.frames
if len(frames) == 1:
frame = frames[0]
if frame.type != self.FRAME_TYPE_SF:
logger.debug("Recieved lone frame not marked as single frame")
return False
# extract data, ignore PCI byte and anything after the marked length
# [ Frame ]
# [ Data ]
# 00 00 07 E8 06 41 00 BE 7F B8 13 xx xx xx xx, anything else is ignored
message.data = frame.data[1:1+frame.data_len]
OBDCommand#__call__
この手続きは端的に言うと引数で受け取った Message 型のオブジェクト配列を ODBResponse オブジェクトに変換し、その value 属性に decode して取得した Quantity オブジェクトを設定するものです。以下に実装を引用します。
def __call__(self, messages):
# filter for applicable messages (from the right ECU(s))
for_us = lambda m: (self.ecu & m.ecu) > 0
messages = list(filter(for_us, messages))
# guarantee data size for the decoder
for m in messages:
self.__constrain_message_data(m)
# create the response object with the raw data recieved
# and reference to original command
r = OBDResponse(self, messages)
if messages:
r.value = self.decode(messages)
else:
logger.info(str(self) + " did not recieve any acceptable messages")
return r
この手続きで decode 属性に設定された手続きオブジェクトを呼び出してメッセージに対して意味付けを行っています。decode 属性に設定される手続きを確認すると、Message オブジェクトの data 属性から情報を取り出して整形を行っている事がわかります。以下に例を引用します。
def decode_uas(messages, id):
d = messages[0].data[2:] # chop off mode and PID bytes
return UAS_IDS[id](d)