{ "cells": [ { "cell_type": "markdown", "id": "394753bc-ab2b-417a-a98a-ba988bd62edd", "metadata": { "tags": [] }, "source": [ "# Wetterdaten Importieren" ] }, { "cell_type": "markdown", "id": "c557767d-2319-441a-8b45-6fe8e4bbfb32", "metadata": {}, "source": [ "Die Wetterdaten vom DWD werden über einen OpenData Server bereitgestellt. Um an diese Daten zu kommen müssen diese relativ komplex heruntergeladen und zusammengefügt werden." ] }, { "cell_type": "markdown", "id": "7abd6877-b35f-4604-ba57-399234b97281", "metadata": {}, "source": [ "Als erstes werden vorbereitungen dafür die Daten zu Importieren getroffen. Dazu werden die benötigten Bibliotehken importiert und einige Variablen gesetzt.\n", "\n", "Außerdem wird ein Ordner angelget in dem die heruntergeladenen Daten gespeichert werde können." ] }, { "cell_type": "code", "execution_count": 8, "id": "c87fe05a-63e3-4748-a01a-d46cb12e9b05", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Fertig\n" ] } ], "source": [ "from operator import contains\n", "import requests\n", "import os\n", "\n", "import zipfile\n", "import io\n", "import pandas as pd\n", "\n", "url = 'https://opendata.dwd.de/climate_environment/CDC/observations_germany/climate/10_minutes/air_temperature/now/'\n", "download_folder = 'dwd-data/'\n", "\n", "from datetime import datetime\n", "\n", "from influxdb_client import InfluxDBClient, Point, WritePrecision, BucketRetentionRules\n", "from influxdb_client.client.write_api import SYNCHRONOUS\n", "\n", "token = \"wb4s191jc33JQ4a6wK3ZECwrrG3LuSyQd61akFa_q6ZCEsequUvFhL9Gre6FaZMA2ElCylKz26ByJ6RetkQaGQ==\"\n", "org = \"test-org\"\n", "bucket = \"dwd_now\"\n", "influx_url = \"http://influxdb:8086\"\n", "\n", "if not os.path.isdir(download_folder):\n", " print(\"Daten Ordner erstellt\")\n", " os.mkdir(download_folder)\n", "\n", "print(\"Fertig\")" ] }, { "cell_type": "markdown", "id": "7cad1e52-4d22-4dc5-952c-3578d73280ec", "metadata": {}, "source": [ "Um die Daten später Importieren zu können muss zuächst ein Bucket in der Datenbank angelegt werden. Wenn das Bucket schon vorhanden ist wird es gelöscht und erneut angelegt damit keine Doppelten Daten vorhandene sind." ] }, { "cell_type": "code", "execution_count": 9, "id": "b9acf473-2f26-40c6-9c48-1a4ec159bd3d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Vorhandes Bucket löschen\n", "Bucket angelegt\n" ] } ], "source": [ "with InfluxDBClient(url=influx_url, token=token) as client:\n", " buckets_api = client.buckets_api()\n", " buckets = buckets_api.find_buckets().buckets \n", " data_bucket = [x for x in buckets if x.name == bucket]\n", " \n", " if len(data_bucket) > 0:\n", " print(\"Vorhandes Bucket löschen\")\n", " buckets_api.delete_bucket(data_bucket[0])\n", " \n", " retention_rules = BucketRetentionRules(type=\"expire\", every_seconds=86400)\n", " created_bucket = buckets_api.create_bucket(bucket_name=bucket, retention_rules=retention_rules, org=org)\n", " \n", " print(\"Bucket angelegt\")" ] }, { "cell_type": "markdown", "id": "88787497-ec8d-47ed-b885-d1a1cfd443e2", "metadata": {}, "source": [ "Um die Daten von der Webseite zu bekommen wird mittel ScreenScrapping jeder Link zu einer der gezipten csv Datein gesucht. Dafür wird BeautifulSoup genutzt. Damit BeautifulSoup die Links finden kann muss zunächst einmal die HTML Datei heruntergeladen werden. " ] }, { "cell_type": "code", "execution_count": 10, "id": "90f1eb08-b4dd-4743-ad38-492bfd742fec", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Download\n", "\n", "10minutenwerte_TU_00071_now.zip\n" ] } ], "source": [ "print(\"Download\")\n", "response = requests.get(url)\n", "print(response)\n", "\n", "from bs4 import BeautifulSoup\n", "\n", "soup = BeautifulSoup(response.text, 'html.parser')\n", "\n", "dwd_links = soup.findAll('a')\n", "\n", "print(dwd_links[2])\n" ] }, { "cell_type": "markdown", "id": "ac3c644a-cac2-41b5-9be0-f01bcb9a40cc", "metadata": {}, "source": [ "Die so gefilterten Links werden dann in dieser Schleife heruntergeladen und gespeichert. Der pfad für die Stationsbeschreibungsdatei wird in eine extra Variable geschrieben um später die Daten der Stationen zu bekommen." ] }, { "cell_type": "code", "execution_count": 11, "id": "2524986b-9c26-42d5-8d76-f4e228d0eb48", "metadata": { "tags": [] }, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Download 480 von 480\r" ] } ], "source": [ "download_counter = int(1)\n", "dwd_len = len(dwd_links)\n", "station_file = ''\n", "\n", "for file_text in dwd_links:\n", " \n", " dwd_len = len(dwd_links)\n", " \n", " if (str(file_text.text).__contains__('10minutenwerte')):\n", " dest_file = download_folder + file_text.text\n", " if not os.path.isfile(dest_file): \n", " file_url = url + \"/\" + file_text.text\n", " \n", " download(file_url, dest_file)\n", " elif (str(file_text)).__contains__('Beschreibung_Stationen'):\n", " dest_file = download_folder + file_text.text\n", " file_url = url + \"/\" + file_text.text\n", " download(file_url,dest_file)\n", " station_file = dest_file\n", " \n", " print(\"Download \", download_counter,\" von \",dwd_len, end='\\r')\n", " download_counter += 1\n", " \n", " def download(url, dest_file):\n", " response = requests.get(file_url)\n", " open(dest_file, 'wb').write(response.content)" ] }, { "cell_type": "markdown", "id": "14b90ff2-1473-4e44-9c6b-fdd2d6c20773", "metadata": {}, "source": [ "Zunächst werden die Wetterstationen in die Klasse Station eingelesen. Aus den Klassen wird ein Dictionary erstellt in dem mittels der Stations_id gesucht werden kann. Weil die Stationsdaten nicht als csv gespeichert sind musste ich eine eigene Technik entwickeln um die Daten auszulesen.\n", "\n", "Als erstes wird so lange gelesen bis kein Leerzeichen mehr erkannt wird. Danach wird gelesen bis wieder ein Leerzeichen erkannt wird. Dadurch können die Felder nacheinander eingelesen werden. " ] }, { "cell_type": "code", "execution_count": 13, "id": "430041d7-21fa-47d8-8df9-7933a8749f82", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "dwd-data/zehn_now_tu_Beschreibung_Stationen.txt\n", "Großenkneten \n", "52.9336 \n" ] } ], "source": [ "\n", "class Station:\n", " def __init__(self, Stations_id, Stationshoehe,geoBreite, geoLaenge, Stationsname, Bundesland):\n", " self.Stations_id = Stations_id\n", " self.Stationshoehe = Stationshoehe\n", " self.geoBreite = geoBreite\n", " self.geoLaenge = geoLaenge\n", " self.name = Stationsname\n", " self.Bundesland = Bundesland\n", "\n", "def read_station_file():\n", " \n", " def get_value(i,line, empty_spaces):\n", " value = \"\"\n", " while(line[i] == ' '):\n", " i += 1\n", " spaces = 0\n", " while(spaces < empty_spaces):\n", " if(line[i] == ' '):\n", " spaces += 1\n", " value += line[i]\n", " i += 1\n", " return (i,value)\n", " \n", " f = open(station_file, \"r\", encoding=\"1252\")\n", " i = 0\n", " stations = {}\n", " for line in f:\n", " if i > 1:\n", "\n", " y = 0\n", "\n", " result = get_value(y,line, 1)\n", " Stations_id = str(int(result[1])) #Die Konvertierung in int und zurück zu string entfernt die am Anfang leigenden nullen\n", " y = result[0]\n", "\n", " result = get_value(y,line, 1)\n", " von_datum = result[1]\n", " y = result[0]\n", "\n", " result = get_value(y,line, 1)\n", " bis_datum = result[1]\n", " y = result[0]\n", "\n", " result = get_value(y,line, 1)\n", " Stationshoehe = result[1]\n", " y = result[0]\n", "\n", " result = get_value(y,line, 1)\n", " geoBreite = result[1]\n", " y = result[0]\n", "\n", " result = get_value(y,line, 1)\n", " geoLaenge = result[1]\n", " y = result[0]\n", "\n", " result = get_value(y,line, 3)\n", " Stationsname = result[1]\n", " y = result[0]\n", "\n", " result = get_value(y,line, 1)\n", " Bundesland = result[1]\n", " y = result[0]\n", "\n", " station = Station(Stations_id, Stationshoehe, geoBreite, geoLaenge, Stationsname ,Bundesland)\n", " stations[Stations_id] = station\n", "\n", " i+=1\n", " return(stations)\n", "\n", "\n", "print(station_file)\n", "stations = read_station_file()\n", "print(stations[\"44\"].name)" ] }, { "cell_type": "markdown", "id": "81bbb42e-3bd9-4b29-a6e3-11e1d1593307", "metadata": {}, "source": [ "Um an die Messerte in den Datein zu kommen müssen diese entpackt werden." ] }, { "cell_type": "code", "execution_count": 6, "id": "27966795-ee46-4af1-b63c-0f728333ec79", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Import durchgeführt \r" ] } ], "source": [ "def import_data(df):\n", " client = InfluxDBClient(url=influx_url, token=token, org=org)\n", " \n", " write_api = client.write_api(write_options=SYNCHRONOUS)\n", " \n", " error = 0\n", " \n", " for index, row in df.iterrows():\n", " \n", " measurement_time = datetime.strptime(str(int(row[1])),\"%Y%m%d%H%M\")\n", "\n", " #station = stations[str(row[0])].name\n", " \n", " try:\n", " station = stations[str(row[0])].name\n", " #print(station)\n", " except:\n", " print(\"Station unknow\", end='\\r')\n", " else:\n", " try:\n", " p = Point(station)\n", "\n", " #if(row[3]) != -999: p.field(\"PP_10\", row[3])\n", " p.field(\"PP_10\", row[3])\n", " p.field(\"TTL10\",row[4])\n", " p.field(\"TM5_10\", row[5])\n", " p.field(\"RF_10\", row[6])\n", " p.field(\"TD_10\", row[7])\n", "\n", " p.time(measurement_time,WritePrecision.S)\n", " write_api.write(bucket=bucket, record=p)\n", " print(\" \", end='\\r')\n", " print(\"Import Station: \", station , end='\\r')\n", " except:\n", " error += 1\n", " if error < 1:\n", " print(\"Error Import Station: \", station)\n", " client.close()\n", "\n", "def read_dwd_file(file):\n", " df = pd.read_csv(file,sep=';')\n", " #print(df, end='\\r')\n", " #print(df.iat[0,1])\n", " import_data(df)\n", "\n", "\n", "for filename in os.listdir(download_folder):\n", " file_path = os.path.join(download_folder, filename)\n", " if(str(file_path).__contains__('.zip')):\n", " zip=zipfile.ZipFile(file_path)\n", " f=zip.open(zip.namelist()[0])\n", " read_dwd_file(f)\n", " #print(contents)\n", "\n", "print(\" \", end='\\r')\n", "print(\"Import durchgeführt\", end='\\r')" ] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.9.12" } }, "nbformat": 4, "nbformat_minor": 5 }