Easy Script to get all US Counties Borders in a Dash Map

I built out a Dash application that fetches all Declared Natural Disasters from the FEMA API, in this process I wanted to map each county that has declared a disaster. Outside of this in a previous Figure Friday focused on AWS data along with countless projects looking to break the USA down into sections I figured it would be worth sharing with the Plotly community how you can easily create County Borders for the USA. With this I created a script that will hit the census.gov and fetch the current county borders and save them by state to a data/states/counties folder.

import requests
from pathlib import Path
import zipfile
import geopandas as gpd


def download_county_data():
    """
    Downloads county border data for all US states from the Census Bureau TIGER/Line
    and saves them as GeoJSON files to data/states/counties/
    """
    # Create directory structure if it doesn't exist
    base_dir = Path("data/states/counties")
    base_dir.mkdir(parents=True, exist_ok=True)
    temp_dir = Path("temp")
    temp_dir.mkdir(exist_ok=True)

    # TIGER/Line FTP URL for 2023 county boundaries
    base_url = "https://www2.census.gov/geo/tiger/TIGER2023/COUNTY"
    filename = "tl_2023_us_county.zip"
    url = f"{base_url}/{filename}"

    print("Downloading US counties shapefile...")
    try:
        # Download the zipfile
        response = requests.get(url, stream=True)
        response.raise_for_status()

        zip_path = temp_dir / filename
        with open(zip_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)

        # Extract the zipfile
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(temp_dir)

        # Read the shapefile with geopandas
        shp_path = temp_dir / "tl_2023_us_county.shp"
        gdf = gpd.read_file(shp_path)

        # Dictionary of state FIPS codes for reference
        state_fips = {
            'AL': '01', 'AK': '02', 'AZ': '04', 'AR': '05', 'CA': '06', 'CO': '08', 'CT': '09',
            'DE': '10', 'FL': '12', 'GA': '13', 'HI': '15', 'ID': '16', 'IL': '17', 'IN': '18',
            'IA': '19', 'KS': '20', 'KY': '21', 'LA': '22', 'ME': '23', 'MD': '24', 'MA': '25',
            'MI': '26', 'MN': '27', 'MS': '28', 'MO': '29', 'MT': '30', 'NE': '31', 'NV': '32',
            'NH': '33', 'NJ': '34', 'NM': '35', 'NY': '36', 'NC': '37', 'ND': '38', 'OH': '39',
            'OK': '40', 'OR': '41', 'PA': '42', 'RI': '44', 'SC': '45', 'SD': '46', 'TN': '47',
            'TX': '48', 'UT': '49', 'VT': '50', 'VA': '51', 'WA': '53', 'WV': '54', 'WI': '55',
            'WY': '56'
        }

        # Invert the state_fips dictionary for lookup
        fips_to_state = {v: k for k, v in state_fips.items()}

        # Split and save by state
        for state_fips, state_abbr in fips_to_state.items():
            print(f"Processing {state_abbr}...")
            state_counties = gdf[gdf['STATEFP'] == state_fips]
            if not state_counties.empty:
                output_file = base_dir / f"{state_abbr.lower()}_counties.geojson"
                state_counties.to_file(output_file, driver='GeoJSON')
                print(f"Successfully saved {state_abbr} counties")

        # Clean up temporary files
        for file in temp_dir.glob("tl_2023_us_county.*"):
            file.unlink()
        zip_path.unlink()
        temp_dir.rmdir()

        print("Successfully completed downloading and processing all county data")

    except requests.exceptions.RequestException as e:
        print(f"Error downloading county data: {e}")
    except Exception as e:
        print(f"Error processing county data: {e}")


if __name__ == "__main__":
    download_county_data()

Then in your dash application you can just use these directly when building a leaflet map like so:

# Initialize a dictionary to store all state GeoJSON data
state_geojson_data = {}

# Get the path to the counties directory
counties_dir = Path('data/states/counties')

# Loop through all GeoJSON files in the directory
for geojson_file in counties_dir.glob('*.geojson'):
    # Get state name from filename (assuming format like 'texas_counties.geojson' or 'texas_county_boundaries.geojson')
    state_name = geojson_file.stem.split('_')[0].lower()

    # Load the GeoJSON data
    try:
        with open(geojson_file) as f:
            state_geojson_data[state_name] = json.load(f)
    except Exception as e:
        print(f"Error loading {geojson_file}: {e}")

# Create map layers dynamically
map_layers = [dl.TileLayer()]
map_layers.extend([
    dl.GeoJSON(
        data=geojson_data,
        id=f"{state}-counties-layer",
        hideout=dict(selected=[]),
        style=style_handler
    )
    for state, geojson_data in state_geojson_data.items()
])

Then just put that map_layer in a dl.Map like:

dl.Map(
    id='disaster-map',
    center=[31.0686, -99.9018],
    zoom=6,
    children=map_layers,
    style={'width': '100%', 'height': '50vh', "zIndex": 0}
)

Hope this helps with a data science project.

Cheers,
Pip

2 Likes

This would be really helpful. Thank you @PipInstallPython . I frequently hear from community members how they would like access to county border in the US.

I tried running your code to download the county data, but I’m getting this error:

Error processing county data: [WinError 2] The system cannot find the file specified: ‘temp\tl_2023_us_county.zip’

Any idea what might be happening? I see that the temp folder was created on my computer and it’s empty.

try this:

import requests
from pathlib import Path
import zipfile
import geopandas as gpd
import os


def download_county_data():
    """
    Downloads county border data for all US states from the Census Bureau TIGER/Line
    and saves them as GeoJSON files to data/states/counties/
    """
    # Create directory structure if it doesn't exist
    base_dir = Path("data/states/counties")
    base_dir.mkdir(parents=True, exist_ok=True)
    temp_dir = Path("temp")
    temp_dir.mkdir(exist_ok=True)

    # TIGER/Line FTP URL for 2023 county boundaries
    base_url = "https://www2.census.gov/geo/tiger/TIGER2023/COUNTY"
    filename = "tl_2023_us_county.zip"
    url = f"{base_url}/{filename}"

    print("Downloading US counties shapefile...")
    try:
        # Download the zipfile
        response = requests.get(url, stream=True)
        response.raise_for_status()

        zip_path = temp_dir / filename
        print(f"Saving zip file to: {zip_path.absolute()}")
        
        # Ensure the file is written completely
        with open(zip_path, 'wb') as f:
            total_size = int(response.headers.get('content-length', 0))
            block_size = 8192
            downloaded = 0
            
            for chunk in response.iter_content(chunk_size=block_size):
                if chunk:
                    f.write(chunk)
                    downloaded += len(chunk)
                    if total_size:
                        percent = int((downloaded / total_size) * 100)
                        print(f"\rDownload progress: {percent}%", end='')
        
        print("\nVerifying downloaded file...")
        if not zip_path.exists():
            raise FileNotFoundError(f"Failed to save zip file to {zip_path}")
        
        print(f"File size: {zip_path.stat().st_size:,} bytes")

        print("Extracting zip file...")
        # Extract the zipfile
        with zipfile.ZipFile(zip_path, 'r') as zip_ref:
            zip_ref.extractall(temp_dir)
            extracted_files = zip_ref.namelist()
            print(f"Extracted files: {extracted_files}")

        # Read the shapefile with geopandas
        shp_path = temp_dir / "tl_2023_us_county.shp"
        if not shp_path.exists():
            raise FileNotFoundError(f"Shapefile not found at {shp_path}")
            
        print("Reading shapefile with GeoPandas...")
        gdf = gpd.read_file(shp_path)
        print(f"Successfully loaded {len(gdf)} counties")

        # Dictionary of state FIPS codes for reference
        state_fips = {
            'AL': '01', 'AK': '02', 'AZ': '04', 'AR': '05', 'CA': '06', 'CO': '08', 'CT': '09',
            'DE': '10', 'FL': '12', 'GA': '13', 'HI': '15', 'ID': '16', 'IL': '17', 'IN': '18',
            'IA': '19', 'KS': '20', 'KY': '21', 'LA': '22', 'ME': '23', 'MD': '24', 'MA': '25',
            'MI': '26', 'MN': '27', 'MS': '28', 'MO': '29', 'MT': '30', 'NE': '31', 'NV': '32',
            'NH': '33', 'NJ': '34', 'NM': '35', 'NY': '36', 'NC': '37', 'ND': '38', 'OH': '39',
            'OK': '40', 'OR': '41', 'PA': '42', 'RI': '44', 'SC': '45', 'SD': '46', 'TN': '47',
            'TX': '48', 'UT': '49', 'VT': '50', 'VA': '51', 'WA': '53', 'WV': '54', 'WI': '55',
            'WY': '56'
        }

        # Invert the state_fips dictionary for lookup
        fips_to_state = {v: k for k, v in state_fips.items()}

        # Split and save by state
        for state_fips, state_abbr in fips_to_state.items():
            print(f"Processing {state_abbr}...")
            state_counties = gdf[gdf['STATEFP'] == state_fips]
            if not state_counties.empty:
                output_file = base_dir / f"{state_abbr.lower()}_counties.geojson"
                state_counties.to_file(output_file, driver='GeoJSON')
                print(f"Successfully saved {state_abbr} counties")

        print("Cleaning up temporary files...")
        # Clean up temporary files
        for file in temp_dir.glob("tl_2023_us_county.*"):
            try:
                file.unlink()
                print(f"Deleted: {file}")
            except Exception as e:
                print(f"Warning: Could not delete {file}: {e}")
        
        try:
            zip_path.unlink()
            print(f"Deleted: {zip_path}")
        except Exception as e:
            print(f"Warning: Could not delete zip file: {e}")
            
        try:
            temp_dir.rmdir()
            print("Deleted temp directory")
        except Exception as e:
            print(f"Warning: Could not delete temp directory: {e}")

        print("Successfully completed downloading and processing all county data")

    except requests.exceptions.RequestException as e:
        print(f"Error downloading county data: {e}")
    except zipfile.BadZipFile as e:
        print(f"Error with zip file: {e}")
    except Exception as e:
        print(f"Error processing county data: {e}")
        print(f"Current working directory: {os.getcwd()}")
        if 'zip_path' in locals():
            print(f"Zip file exists: {zip_path.exists()}")
            if zip_path.exists():
                print(f"Zip file size: {zip_path.stat().st_size:,} bytes")


if __name__ == "__main__":
    download_county_data()

My setup was based on mac think your computer was just having trouble finding the zip file. If that doesn’t work try creating a temp folder in the root and that will probably work

I think I got it. Thank you @PipInstallPython .
By the way, can you share the code for the style_handler object you assigned to style in case that’s relevant?

Sure:

style_handler = assign("""function(feature, context){
    const affected_counties = context.hideout.affected_counties || [];
    if(affected_counties.includes(feature.properties.NAME)){
        return {fillColor: 'red', fillOpacity: 0.7, weight: 2, color: 'white'};
    }
    return {fillColor: 'grey', fillOpacity: 0.3, weight: 2, color: 'white'};
}""")

This was just used to make affected_counties turn red if their was a fema event associated with the county and the selected fema type

1 Like

Thank you @PipInstallPython .
I’ll leave that styling out for this example.

Here is the full executable code with Dash. Thank you for creating this example, @PipInstallPython . It’s going to be helpful for many people.

import requests
from pathlib import Path
import zipfile
import geopandas as gpd
import os
from dash import Dash, dcc, callback, Output, Input
import dash_leaflet as dl
import json

app = Dash()


# Initialize a dictionary to store all state GeoJSON data
state_geojson_data = {}

# Get the path to the counties directory
counties_dir = Path('data/states/counties')

# Loop through all GeoJSON files in the directory
for geojson_file in counties_dir.glob('*.geojson'):
    # Get state name from filename (assuming format like 'texas_counties.geojson' or 'texas_county_boundaries.geojson')
    state_name = geojson_file.stem.split('_')[0].lower()

    # Load the GeoJSON data
    try:
        with open(geojson_file) as f:
            state_geojson_data[state_name] = json.load(f)
    except Exception as e:
        print(f"Error loading {geojson_file}: {e}")

# Create map layers dynamically
map_layers = [dl.TileLayer()]
map_layers.extend([
    dl.GeoJSON(
        data=geojson_data,
        id=f"{state}-counties-layer",
        hideout=dict(selected=[]),
        # style=style_handler
    )
    for state, geojson_data in state_geojson_data.items()
])

my_map = dl.Map(
    id='disaster-map',
    center=[31.0686, -99.9018],
    zoom=6,
    children=map_layers,
    style={'width': '100%', 'height': '50vh', "zIndex": 0}
)

app.layout = [
    my_map
]

if __name__ == '__main__':
    app.run_server(debug=True)

1 Like