Add webhook block private setting
parent
8b130f0361
commit
e37eb65431
|
@ -547,6 +547,7 @@ EXPORTS_TTL = 60 * 60 * 24 # 24 hours
|
|||
|
||||
CELERY_ENABLED = False
|
||||
WEBHOOKS_ENABLED = False
|
||||
WEBHOOKS_BLOCK_PRIVATE_ADDRESS = False
|
||||
|
||||
|
||||
# If is True /front/sitemap.xml show a valid sitemap of taiga-front client
|
||||
|
|
|
@ -50,7 +50,7 @@ def reverse(viewname, *args, **kwargs):
|
|||
return get_absolute_url(django_reverse(viewname, *args, **kwargs))
|
||||
|
||||
|
||||
class HostnameValueError(Exception):
|
||||
class HostnameException(Exception):
|
||||
pass
|
||||
|
||||
|
||||
|
@ -65,7 +65,7 @@ def validate_private_url(url):
|
|||
try:
|
||||
socket_args, *others = socket.getaddrinfo(host, port)
|
||||
except Exception:
|
||||
raise HostnameValueError(_("Host access error"))
|
||||
raise HostnameException(_("Host access error"))
|
||||
|
||||
destination_address = socket_args[4][0]
|
||||
try:
|
||||
|
|
|
@ -21,6 +21,8 @@ import hashlib
|
|||
import requests
|
||||
from requests.exceptions import RequestException
|
||||
|
||||
from django.conf import settings
|
||||
|
||||
from taiga.base.api.renderers import UnicodeJSONRenderer
|
||||
from taiga.base.utils import json, urls
|
||||
from taiga.base.utils.db import get_typename_for_model_instance
|
||||
|
@ -64,6 +66,15 @@ def _generate_signature(data, key):
|
|||
return mac.hexdigest()
|
||||
|
||||
|
||||
def _remove_leftover_webhooklogs(webhook_id):
|
||||
# Only the last ten webhook logs traces are required
|
||||
# so remove the leftover
|
||||
ids = (WebhookLog.objects.filter(webhook_id=webhook_id)
|
||||
.order_by("-id")
|
||||
.values_list('id', flat=True)[10:])
|
||||
WebhookLog.objects.filter(id__in=ids).delete()
|
||||
|
||||
|
||||
def _send_request(webhook_id, url, key, data):
|
||||
serialized_data = UnicodeJSONRenderer().render(data)
|
||||
signature = _generate_signature(serialized_data, key)
|
||||
|
@ -73,19 +84,22 @@ def _send_request(webhook_id, url, key, data):
|
|||
"Content-Type": "application/json"
|
||||
}
|
||||
|
||||
try:
|
||||
urls.validate_destination_address(url)
|
||||
except urls.IpAddresValueError as e:
|
||||
# Error validating url
|
||||
webhook_log = WebhookLog.objects.create(webhook_id=webhook_id, url=url,
|
||||
status=0,
|
||||
request_data=data,
|
||||
request_headers=dict(),
|
||||
response_data="error-in-request: {}".format(
|
||||
str(e)),
|
||||
response_headers={},
|
||||
duration=0)
|
||||
return webhook_log
|
||||
if settings.WEBHOOKS_BLOCK_PRIVATE_ADDRESS:
|
||||
try:
|
||||
urls.validate_private_url(url)
|
||||
except (urls.IpAddresValueError, urls.HostnameException) as e:
|
||||
# Error validating url
|
||||
webhook_log = WebhookLog.objects.create(webhook_id=webhook_id, url=url,
|
||||
status=0,
|
||||
request_data=data,
|
||||
request_headers=dict(),
|
||||
response_data="error-in-request: {}".format(
|
||||
str(e)),
|
||||
response_headers={},
|
||||
duration=0)
|
||||
return webhook_log
|
||||
finally:
|
||||
_remove_leftover_webhooklogs(webhook_id)
|
||||
|
||||
request = requests.Request('POST', url, data=serialized_data, headers=headers)
|
||||
prepared_request = request.prepare()
|
||||
|
@ -114,12 +128,7 @@ def _send_request(webhook_id, url, key, data):
|
|||
response_headers=dict(response.headers),
|
||||
duration=response.elapsed.total_seconds())
|
||||
finally:
|
||||
# Only the last ten webhook logs traces are required
|
||||
# so remove the leftover
|
||||
ids = (WebhookLog.objects.filter(webhook_id=webhook_id)
|
||||
.order_by("-id")
|
||||
.values_list('id', flat=True)[10:])
|
||||
WebhookLog.objects.filter(id__in=ids).delete()
|
||||
_remove_leftover_webhooklogs(webhook_id)
|
||||
|
||||
return webhook_log
|
||||
|
||||
|
|
|
@ -16,13 +16,14 @@
|
|||
# You should have received a copy of the GNU Affero General Public License
|
||||
# along with this program. If not, see <http://www.gnu.org/licenses/>.
|
||||
import ipaddress
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from django.conf import settings
|
||||
from django.utils.translation import ugettext as _
|
||||
|
||||
from taiga.base.api import validators
|
||||
from urllib.parse import urlparse
|
||||
|
||||
from taiga.base.exceptions import ValidationError
|
||||
|
||||
from .models import Webhook
|
||||
|
||||
|
||||
|
@ -31,11 +32,12 @@ class WebhookValidator(validators.ModelValidator):
|
|||
model = Webhook
|
||||
|
||||
def validate_url(self, attrs, source):
|
||||
host = urlparse(attrs[source]).hostname
|
||||
try:
|
||||
ipa = ipaddress.ip_address(host)
|
||||
except ValueError:
|
||||
if settings.WEBHOOKS_BLOCK_PRIVATE_ADDRESS:
|
||||
host = urlparse(attrs[source]).hostname
|
||||
try:
|
||||
ipa = ipaddress.ip_address(host)
|
||||
except ValueError:
|
||||
return attrs
|
||||
if ipa.is_private:
|
||||
raise ValidationError(_("Not allowed IP Address"))
|
||||
return attrs
|
||||
if ipa.is_private:
|
||||
raise ValidationError(_("Not allowed IP Address"))
|
||||
return attrs
|
||||
|
|
|
@ -55,7 +55,7 @@ def test_webhook_action_test_transform_to_json(client, data):
|
|||
response.elapsed.total_seconds.return_value = 100
|
||||
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response), \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
client.login(data.project_owner)
|
||||
response = client.json.post(url)
|
||||
assert response.status_code == 200
|
||||
|
|
|
@ -45,25 +45,25 @@ def test_new_object_with_one_webhook_signal(settings):
|
|||
|
||||
for obj in objects:
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
services.take_snapshot(obj, user=obj.owner, comment="test")
|
||||
assert session_send_mock.call_count == 1
|
||||
|
||||
for obj in objects:
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
services.take_snapshot(obj, user=obj.owner)
|
||||
assert session_send_mock.call_count == 0
|
||||
|
||||
for obj in objects:
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
services.take_snapshot(obj, user=obj.owner, comment="test")
|
||||
assert session_send_mock.call_count == 1
|
||||
|
||||
for obj in objects:
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
services.take_snapshot(obj, user=obj.owner, comment="test", delete=True)
|
||||
assert session_send_mock.call_count == 1
|
||||
|
||||
|
@ -86,25 +86,25 @@ def test_new_object_with_two_webhook_signals(settings):
|
|||
|
||||
for obj in objects:
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
services.take_snapshot(obj, user=obj.owner, comment="test")
|
||||
assert session_send_mock.call_count == 2
|
||||
|
||||
for obj in objects:
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
services.take_snapshot(obj, user=obj.owner, comment="test")
|
||||
assert session_send_mock.call_count == 2
|
||||
|
||||
for obj in objects:
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
services.take_snapshot(obj, user=obj.owner)
|
||||
assert session_send_mock.call_count == 0
|
||||
|
||||
for obj in objects:
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
services.take_snapshot(obj, user=obj.owner, comment="test", delete=True)
|
||||
assert session_send_mock.call_count == 2
|
||||
|
||||
|
@ -126,12 +126,12 @@ def test_send_request_one_webhook_signal(settings):
|
|||
|
||||
for obj in objects:
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
services.take_snapshot(obj, user=obj.owner, comment="test")
|
||||
assert session_send_mock.call_count == 1
|
||||
|
||||
for obj in objects:
|
||||
with patch("taiga.webhooks.tasks.requests.Session.send", return_value=response) as session_send_mock, \
|
||||
patch("taiga.base.utils.urls.validate_destination_address", return_value=True):
|
||||
patch("taiga.base.utils.urls.validate_private_url", return_value=True):
|
||||
services.take_snapshot(obj, user=obj.owner, comment="test", delete=True)
|
||||
assert session_send_mock.call_count == 1
|
||||
|
|
|
@ -23,7 +23,7 @@ import django_sites as sites
|
|||
import re
|
||||
|
||||
from taiga.base.utils.urls import get_absolute_url, is_absolute_url, build_url, \
|
||||
validate_private_url, IpAddresValueError
|
||||
validate_private_url, IpAddresValueError, HostnameException
|
||||
from taiga.base.utils.db import save_in_bulk, update_in_bulk, to_tsquery
|
||||
|
||||
pytestmark = pytest.mark.django_db
|
||||
|
@ -103,6 +103,8 @@ TS_QUERY_TRANSFORMATIONS = [
|
|||
('""', "'\"\"':*"),
|
||||
('"""', "'\"\"':* & '\"':*"),
|
||||
]
|
||||
|
||||
|
||||
def test_to_tsquery():
|
||||
for (input, expected) in TS_QUERY_TRANSFORMATIONS:
|
||||
expected = re.sub("([0-9])", r"'\1':*", expected)
|
||||
|
@ -121,13 +123,21 @@ def test_to_tsquery():
|
|||
"http://[::ffff:c0a8:164]/",
|
||||
"scp://192.168.1.100/",
|
||||
"http://www.192.168.1.100.xip.io/",
|
||||
"http://test.local/",
|
||||
])
|
||||
def test_validate_bad_destination_address(url):
|
||||
with pytest.raises(IpAddresValueError):
|
||||
validate_private_url(url)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("url", [
|
||||
"http://test.local/",
|
||||
"http://test.test/",
|
||||
])
|
||||
def test_validate_invalid_destination_address(url):
|
||||
with pytest.raises(HostnameException):
|
||||
validate_private_url(url)
|
||||
|
||||
|
||||
@pytest.mark.parametrize("url", [
|
||||
"http://192.167.0.12",
|
||||
"http://11.0.0.1",
|
||||
|
|
Loading…
Reference in New Issue