summaryrefslogtreecommitdiff
path: root/epan/dissectors/packet-kafka.c
diff options
context:
space:
mode:
authorDmitry Lazurkin <dilaz03@gmail.com>2016-12-10 16:45:10 +0300
committerMartin Mathieson <martin.r.mathieson@googlemail.com>2016-12-17 22:54:55 +0000
commitced54aeb067065e29a621be2b6a1192e25a6e1bf (patch)
tree9d7e717eae0d5e2d0577a7c277e38f92f84fe0e2 /epan/dissectors/packet-kafka.c
parenta5374d8955a0cc22005cee761c995cc72b638b15 (diff)
downloadwireshark-ced54aeb067065e29a621be2b6a1192e25a6e1bf.tar.gz
kafka: add expert info about unsupported api key and version
Change-Id: I622e6f06529377e089cbeeb83d926135f983d3f3 Reviewed-on: https://code.wireshark.org/review/19194 Petri-Dish: Martin Mathieson <martin.r.mathieson@googlemail.com> Tested-by: Petri Dish Buildbot <buildbot-no-reply@wireshark.org> Reviewed-by: Martin Mathieson <martin.r.mathieson@googlemail.com>
Diffstat (limited to 'epan/dissectors/packet-kafka.c')
-rw-r--r--epan/dissectors/packet-kafka.c228
1 files changed, 186 insertions, 42 deletions
diff --git a/epan/dissectors/packet-kafka.c b/epan/dissectors/packet-kafka.c
index 3991364a96..08b301794a 100644
--- a/epan/dissectors/packet-kafka.c
+++ b/epan/dissectors/packet-kafka.c
@@ -141,10 +141,26 @@ static int ett_kafka_configs = -1;
static int ett_kafka_config = -1;
static expert_field ei_kafka_request_missing = EI_INIT;
+static expert_field ei_kafka_unknown_api_key = EI_INIT;
+static expert_field ei_kafka_unsupported_api_version = EI_INIT;
static expert_field ei_kafka_message_decompress = EI_INIT;
static expert_field ei_kafka_bad_string_length = EI_INIT;
static expert_field ei_kafka_bad_bytes_length = EI_INIT;
+typedef gint16 kafka_api_key_t;
+typedef gint16 kafka_api_version_t;
+typedef gint16 kafka_error_t;
+typedef gint32 kafka_partition_t;
+typedef gint64 kafka_offset_t;
+
+typedef struct _kafka_api_info_t {
+ kafka_api_key_t api_key;
+ const char *name;
+ /* If api key is not supported then set min_version and max_version to -1 */
+ kafka_api_version_t min_version;
+ kafka_api_version_t max_version;
+} kafka_api_info_t;
+
#define KAFKA_PRODUCE 0
#define KAFKA_FETCH 1
#define KAFKA_OFFSETS 2
@@ -166,31 +182,56 @@ static expert_field ei_kafka_bad_bytes_length = EI_INIT;
#define KAFKA_API_VERSIONS 18
#define KAFKA_CREATE_TOPICS 19
#define KAFKA_DELETE_TOPICS 20
-static const value_string kafka_apis[] = {
- { KAFKA_PRODUCE, "Produce" },
- { KAFKA_FETCH, "Fetch" },
- { KAFKA_OFFSETS, "Offsets" },
- { KAFKA_METADATA, "Metadata" },
- { KAFKA_LEADER_AND_ISR, "LeaderAndIsr" },
- { KAFKA_STOP_REPLICA, "StopReplica" },
- { KAFKA_UPDATE_METADATA, "UpdateMetadata" },
- { KAFKA_CONTROLLED_SHUTDOWN, "ControlledShutdown" },
- { KAFKA_OFFSET_COMMIT, "OffsetCommit" },
- { KAFKA_OFFSET_FETCH, "OffsetFetch" },
- { KAFKA_GROUP_COORDINATOR, "GroupCoordinator" },
- { KAFKA_JOIN_GROUP, "JoinGroup" },
- { KAFKA_HEARTBEAT, "Heatbeat" },
- { KAFKA_LEAVE_GROUP, "LeaveGroup" },
- { KAFKA_SYNC_GROUP, "SyncGroup" },
- { KAFKA_DESCRIBE_GROUPS, "DescribeGroups" },
- { KAFKA_LIST_GROUPS, "ListGroups" },
- { KAFKA_SASL_HANDSHAKE, "SaslHandshake" },
- { KAFKA_API_VERSIONS, "ApiVersions" },
- { KAFKA_CREATE_TOPICS, "CreateTopics" },
- { KAFKA_DELETE_TOPICS, "DeleteTopics" },
- { 0, NULL }
+static const kafka_api_info_t kafka_apis[] = {
+ { KAFKA_PRODUCE, "Produce",
+ 0, 2 },
+ { KAFKA_FETCH, "Fetch",
+ 0, 3 },
+ { KAFKA_OFFSETS, "Offsets",
+ 0, 1 },
+ { KAFKA_METADATA, "Metadata",
+ 0, 2 },
+ { KAFKA_LEADER_AND_ISR, "LeaderAndIsr",
+ 0, 0 },
+ { KAFKA_STOP_REPLICA, "StopReplica",
+ 0, 0 },
+ { KAFKA_UPDATE_METADATA, "UpdateMetadata",
+ 0, 2 },
+ { KAFKA_CONTROLLED_SHUTDOWN, "ControlledShutdown",
+ 1, 1 },
+ { KAFKA_OFFSET_COMMIT, "OffsetCommit",
+ 0, 2 },
+ { KAFKA_OFFSET_FETCH, "OffsetFetch",
+ 0, 1 },
+ { KAFKA_GROUP_COORDINATOR, "GroupCoordinator",
+ 0, 0 },
+ { KAFKA_JOIN_GROUP, "JoinGroup",
+ 0, 1 },
+ { KAFKA_HEARTBEAT, "Heatbeat",
+ 0, 0 },
+ { KAFKA_LEAVE_GROUP, "LeaveGroup",
+ 0, 0 },
+ { KAFKA_SYNC_GROUP, "SyncGroup",
+ 0, 0 },
+ { KAFKA_DESCRIBE_GROUPS, "DescribeGroups",
+ 0, 0 },
+ { KAFKA_LIST_GROUPS, "ListGroups",
+ 0, 0 },
+ { KAFKA_SASL_HANDSHAKE, "SaslHandshake",
+ 0, 0 },
+ { KAFKA_API_VERSIONS, "ApiVersions",
+ 0, 0 },
+ { KAFKA_CREATE_TOPICS, "CreateTopics",
+ 0, 0 },
+ { KAFKA_DELETE_TOPICS, "DeleteTopics",
+ 0, 0 },
};
+/*
+ * Generated from kafka_apis. Add 1 to length for last dummy element.
+ */
+static value_string kafka_api_names[array_length(kafka_apis) + 1];
+
static const value_string kafka_errors[] = {
{ -1, "Unexpected Server Error" },
{ 0, "No Error" },
@@ -286,12 +327,6 @@ static range_t *current_kafka_tcp_range = NULL;
* by default */
static gboolean kafka_show_string_bytes_lengths = FALSE;
-typedef gint16 kafka_api_key_t;
-typedef gint16 kafka_api_version_t;
-typedef gint16 kafka_error_t;
-typedef gint32 kafka_partition_t;
-typedef gint64 kafka_offset_t;
-
typedef struct _kafka_query_response_t {
kafka_api_key_t api_key;
kafka_api_version_t api_version;
@@ -317,13 +352,69 @@ dissect_kafka_message_set(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, i
static const char *
kafka_error_to_str(kafka_error_t error)
{
- return val_to_str_const(error, kafka_errors, "Unknown");
+ return val_to_str(error, kafka_errors, "Unknown %d");
}
static const char *
kafka_api_key_to_str(kafka_api_key_t api_key)
{
- return val_to_str_const(api_key, kafka_apis, "Unknown");
+ return val_to_str(api_key, kafka_api_names, "Unknown %d");
+}
+
+static const kafka_api_info_t *
+kafka_get_api_info(kafka_api_key_t api_key)
+{
+ if ((api_key >= 0) && (api_key < ((kafka_api_key_t) array_length(kafka_apis)))) {
+ return &kafka_apis[api_key];
+ } else {
+ return NULL;
+ }
+}
+
+static gboolean
+kafka_is_api_version_supported(const kafka_api_info_t *api_info, kafka_api_version_t api_version)
+{
+ DISSECTOR_ASSERT(api_info);
+
+ return !(api_info->min_version == -1 ||
+ api_version < api_info->min_version ||
+ api_version > api_info->max_version);
+}
+
+static void
+kafka_check_supported_api_key(packet_info *pinfo, proto_item *ti, kafka_query_response_t *matcher)
+{
+ if (kafka_get_api_info(matcher->api_key) == NULL) {
+ col_append_fstr(pinfo->cinfo, COL_INFO, " [Unknown API key]");
+ expert_add_info_format(pinfo, ti, &ei_kafka_unknown_api_key,
+ "%s API key", kafka_api_key_to_str(matcher->api_key));
+ }
+}
+
+static void
+kafka_check_supported_api_version(packet_info *pinfo, proto_item *ti, kafka_query_response_t *matcher)
+{
+ const kafka_api_info_t *api_info;
+
+ api_info = kafka_get_api_info(matcher->api_key);
+ if (api_info != NULL && !kafka_is_api_version_supported(api_info, matcher->api_version)) {
+ col_append_fstr(pinfo->cinfo, COL_INFO, " [Unsupported API version]");
+ if (api_info->min_version == -1) {
+ expert_add_info_format(pinfo, ti, &ei_kafka_unsupported_api_version,
+ "Unsupported %s version.",
+ kafka_api_key_to_str(matcher->api_key));
+ }
+ else if (api_info->min_version == api_info->max_version) {
+ expert_add_info_format(pinfo, ti, &ei_kafka_unsupported_api_version,
+ "Unsupported %s version. Supports v%d.",
+ kafka_api_key_to_str(matcher->api_key), api_info->min_version);
+ } else {
+ expert_add_info_format(pinfo, ti, &ei_kafka_unsupported_api_version,
+ "Unsupported %s version. Supports v%d-%d.",
+ kafka_api_key_to_str(matcher->api_key),
+ api_info->min_version, api_info->max_version);
+ }
+ }
}
static guint
@@ -1660,12 +1751,13 @@ dissect_kafka_api_versions_request(tvbuff_t *tvb _U_, packet_info *pinfo _U_, pr
static int
dissect_kafka_api_versions_response_api_version(tvbuff_t *tvb, packet_info *pinfo _U_, proto_tree *tree,
- int start_offset, kafka_api_version_t api_version _U_)
+ int offset, kafka_api_version_t api_version _U_)
{
proto_item *ti;
proto_tree *subtree;
- int offset = start_offset;
- kafka_api_version_t api_key, min_version, max_version;
+ kafka_api_key_t api_key;
+ kafka_api_version_t min_version, max_version;
+ const kafka_api_info_t *api_info;
subtree = proto_tree_add_subtree(tree, tvb, offset, -1, ett_kafka_api_version, &ti,
"API Version");
@@ -1682,7 +1774,7 @@ dissect_kafka_api_versions_response_api_version(tvbuff_t *tvb, packet_info *pinf
proto_tree_add_item(subtree, hf_kafka_api_versions_max_version, tvb, offset, 2, ENC_BIG_ENDIAN);
offset += 2;
- proto_item_set_len(ti, offset - start_offset);
+ proto_item_set_end(ti, tvb, offset);
if (max_version != min_version) {
/* Range of versions supported. */
proto_item_append_text(subtree, " %s (v%d-%d)",
@@ -1696,6 +1788,36 @@ dissect_kafka_api_versions_response_api_version(tvbuff_t *tvb, packet_info *pinf
min_version);
}
+ api_info = kafka_get_api_info(api_key);
+ if (api_info == NULL) {
+ proto_item_append_text(subtree, " [Unknown API key]");
+ expert_add_info_format(pinfo, ti, &ei_kafka_unknown_api_key,
+ "%s API key", kafka_api_key_to_str(api_key));
+ }
+ else if (!kafka_is_api_version_supported(api_info, min_version) ||
+ !kafka_is_api_version_supported(api_info, max_version)) {
+ if (api_info->min_version == -1) {
+ proto_item_append_text(subtree, " [Unsupported API version]");
+ expert_add_info_format(pinfo, ti, &ei_kafka_unsupported_api_version,
+ "Unsupported %s version.",
+ kafka_api_key_to_str(api_key));
+ }
+ else if (api_info->min_version == api_info->max_version) {
+ proto_item_append_text(subtree, " [Unsupported API version. Supports v%d]",
+ api_info->min_version);
+ expert_add_info_format(pinfo, ti, &ei_kafka_unsupported_api_version,
+ "Unsupported %s version. Supports v%d.",
+ kafka_api_key_to_str(api_key), api_info->min_version);
+ } else {
+ proto_item_append_text(subtree, " [Unsupported API version. Supports v%d-%d]",
+ api_info->min_version, api_info->max_version);
+ expert_add_info_format(pinfo, ti, &ei_kafka_unsupported_api_version,
+ "Unsupported %s version. Supports v%d-%d.",
+ kafka_api_key_to_str(api_key),
+ api_info->min_version, api_info->max_version);
+ }
+ }
+
return offset;
}
@@ -3060,11 +3182,13 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
PROTO_ITEM_SET_GENERATED(ti);
}
- proto_tree_add_item(kafka_tree, hf_kafka_request_api_key, tvb, offset, 2, ENC_BIG_ENDIAN);
+ ti = proto_tree_add_item(kafka_tree, hf_kafka_request_api_key, tvb, offset, 2, ENC_BIG_ENDIAN);
offset += 2;
+ kafka_check_supported_api_key(pinfo, ti, matcher);
- proto_tree_add_item(kafka_tree, hf_kafka_request_api_version, tvb, offset, 2, ENC_BIG_ENDIAN);
+ ti = proto_tree_add_item(kafka_tree, hf_kafka_request_api_version, tvb, offset, 2, ENC_BIG_ENDIAN);
offset += 2;
+ kafka_check_supported_api_version(pinfo, ti, matcher);
proto_tree_add_item(kafka_tree, hf_kafka_correlation_id, tvb, offset, 4, ENC_BIG_ENDIAN);
offset += 4;
@@ -3184,12 +3308,13 @@ dissect_kafka(tvbuff_t *tvb, packet_info *pinfo, proto_tree *tree, void* data _U
ti = proto_tree_add_int(kafka_tree, hf_kafka_response_api_key, tvb,
0, 0, matcher->api_key);
PROTO_ITEM_SET_GENERATED(ti);
+ kafka_check_supported_api_key(pinfo, ti, matcher);
/* Also show api version from request */
ti = proto_tree_add_int(kafka_tree, hf_kafka_response_api_version, tvb,
0, 0, matcher->api_version);
PROTO_ITEM_SET_GENERATED(ti);
-
+ kafka_check_supported_api_version(pinfo, ti, matcher);
switch (matcher->api_key) {
case KAFKA_PRODUCE:
@@ -3279,6 +3404,21 @@ apply_kafka_prefs(void) {
current_kafka_tcp_range = prefs_get_range_value("kafka", "tcp.port");
}
+static void
+compute_kafka_api_names(void)
+{
+ guint i;
+ guint len = array_length(kafka_apis);
+
+ for (i = 0; i < len; ++i) {
+ kafka_api_names[i].value = kafka_apis[i].api_key;
+ kafka_api_names[i].strptr = kafka_apis[i].name;
+ }
+
+ kafka_api_names[len].value = 0;
+ kafka_api_names[len].strptr = NULL;
+}
+
void
proto_register_kafka(void)
{
@@ -3315,12 +3455,12 @@ proto_register_kafka(void)
},
{ &hf_kafka_request_api_key,
{ "API Key", "kafka.request_key",
- FT_INT16, BASE_DEC, VALS(kafka_apis), 0,
+ FT_INT16, BASE_DEC, VALS(kafka_api_names), 0,
"Request API.", HFILL }
},
{ &hf_kafka_response_api_key,
{ "API Key", "kafka.response_key",
- FT_INT16, BASE_DEC, VALS(kafka_apis), 0,
+ FT_INT16, BASE_DEC, VALS(kafka_api_names), 0,
"Response API.", HFILL }
},
{ &hf_kafka_request_api_version,
@@ -3560,7 +3700,7 @@ proto_register_kafka(void)
},
{ &hf_kafka_api_versions_api_key,
{ "API Key", "kafka.api_versions.api_key",
- FT_INT16, BASE_DEC, VALS(kafka_apis), 0,
+ FT_INT16, BASE_DEC, VALS(kafka_api_names), 0,
"API Key.", HFILL }
},
{ &hf_kafka_api_versions_min_version,
@@ -3692,6 +3832,10 @@ proto_register_kafka(void)
static ei_register_info ei[] = {
{ &ei_kafka_request_missing,
{ "kafka.request_missing", PI_UNDECODED, PI_WARN, "Request missing", EXPFILL }},
+ { &ei_kafka_unknown_api_key,
+ { "kafka.unknown_api_key", PI_UNDECODED, PI_WARN, "Unknown API key", EXPFILL }},
+ { &ei_kafka_unsupported_api_version,
+ { "kafka.unsupported_api_version", PI_UNDECODED, PI_WARN, "Unsupported API version", EXPFILL }},
{ &ei_kafka_message_decompress,
{ "kafka.decompress_failed", PI_UNDECODED, PI_WARN, "Failed to decompress message", EXPFILL }},
{ &ei_kafka_bad_string_length,
@@ -3705,6 +3849,7 @@ proto_register_kafka(void)
proto_kafka = proto_register_protocol("Kafka", "Kafka", "kafka");
+ compute_kafka_api_names();
proto_register_field_array(proto_kafka, hf, array_length(hf));
proto_register_subtree_array(ett, array_length(ett));
expert_kafka = expert_register_protocol(proto_kafka);
@@ -3716,7 +3861,6 @@ proto_register_kafka(void)
"Show length for string and bytes fields in the protocol tree",
"",
&kafka_show_string_bytes_lengths);
-
}
void