kafka/client_async.py | 7 ++++++- test/test_assignors.py | 2 +- test/test_conn.py | 17 +++++++++++------ test/test_consumer_integration.py | 5 ++++- 4 files changed, 22 insertions(+), 9 deletions(-) diff --git a/kafka/client_async.py b/kafka/client_async.py index 58f22d4..9db8008 100644 --- a/kafka/client_async.py +++ b/kafka/client_async.py @@ -5,6 +5,7 @@ import copy import logging import random import socket +import sys import threading import time import weakref @@ -29,7 +30,11 @@ from kafka.protocol.metadata import MetadataRequest from kafka.util import Dict, WeakMethod # Although this looks unused, it actually monkey-patches socket.socketpair() # and should be left in as long as we're using socket.socketpair() in this file -from kafka.vendor import socketpair + +# backports.socketpair is bundled as socketpair, +# stdlib's socket.socketpair supports Windows since Python 3.5 +if sys.version_info < (3, 5): + from kafka.vendor import socketpair from kafka.version import __version__ if six.PY2: diff --git a/test/test_assignors.py b/test/test_assignors.py index 016ff8e..a947983 100644 --- a/test/test_assignors.py +++ b/test/test_assignors.py @@ -655,7 +655,7 @@ def test_conflicting_previous_assignments(mocker): 'execution_number,n_topics,n_consumers', [(i, randint(10, 20), randint(20, 40)) for i in range(100)] ) def test_reassignment_with_random_subscriptions_and_changes(mocker, execution_number, n_topics, n_consumers): - all_topics = set(['t{}'.format(i) for i in range(1, n_topics + 1)]) + all_topics = sorted(['t{}'.format(i) for i in range(1, n_topics + 1)]) partitions = dict([(t, set(range(1, i + 1))) for i, t in enumerate(all_topics)]) cluster = create_cluster(mocker, topics=all_topics, topic_partitions_lambda=lambda t: partitions[t]) diff --git a/test/test_conn.py b/test/test_conn.py index 966f7b3..c6bbdc5 100644 --- a/test/test_conn.py +++ b/test/test_conn.py @@ -1,7 +1,7 @@ # pylint: skip-file from __future__ import absolute_import -from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET +from errno import EALREADY, EINPROGRESS, EISCONN, ECONNRESET, EAFNOSUPPORT import socket import mock @@ -287,11 +287,16 @@ def test_lookup_on_connect(): with mock.patch("socket.getaddrinfo", return_value=mock_return2) as m: conn.last_attempt = 0 - conn.connect() - m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM) - assert conn._sock_afi == afi2 - assert conn._sock_addr == sockaddr2 - conn.close() + try: + conn.connect() + except socket.error as e: + if e.errno != EAFNOSUPPORT: + raise + else: + m.assert_called_once_with(hostname, port, 0, socket.SOCK_STREAM) + assert conn._sock_afi == afi2 + assert conn._sock_addr == sockaddr2 + conn.close() def test_relookup_on_failure(): diff --git a/test/test_consumer_integration.py b/test/test_consumer_integration.py index 90b7ed2..69a22ee 100644 --- a/test/test_consumer_integration.py +++ b/test/test_consumer_integration.py @@ -275,7 +275,10 @@ def test_kafka_consumer_offsets_search_many_partitions(kafka_consumer, kafka_pro } -@pytest.mark.skipif(env_kafka_version() >= (0, 10, 1), reason="Requires KAFKA_VERSION < 0.10.1") +@pytest.mark.skipif( + not env_kafka_version() or env_kafka_version() >= (0, 10, 1), + reason="Requires KAFKA_VERSION < 0.10.1", +) def test_kafka_consumer_offsets_for_time_old(kafka_consumer, topic): consumer = kafka_consumer tp = TopicPartition(topic, 0)